# S&P 500 Data Fetching and Feature Engineering Pipeline
This notebook fetches historical data for all S&P 500 companies and the S&P 500 index, engineers a comprehensive set of technical indicators and features, and saves the final dataset.

## 1. Setup and Imports
Import all necessary libraries for the pipeline.

In [4]:
import yfinance as yf
import pandas as pd
import pandas_ta as ta
import requests
import io
import os
from datetime import datetime
import numpy as np

## 2. Helper and Data Fetching Functions
These functions handle the core tasks of fetching tickers and downloading data from the web.

In [8]:
def list_slickcharts_sp500() -> pd.DataFrame:
    """Scrapes the current list of S&P 500 tickers from SlickCharts."""
    url = 'https://www.slickcharts.com/sp500'
    headers = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/111.0'}
    response = requests.get(url, headers=headers)
    df = pd.read_html(io.StringIO(response.text), match='Symbol', index_col='Symbol')[0]
    return df.reset_index()

def fix_yfinance_indexes(df: pd.DataFrame) -> pd.DataFrame:
    """Flattens the multi-level index returned by yfinance for grouped downloads."""
    return df.stack(level=0, future_stack=True).rename_axis(["Date", "Ticker"]).reset_index(level=1)

def fetch_sp500_stock_data(tickers: list, start_date="2003-01-01") -> pd.DataFrame:
    """Downloads historical data for a list of tickers in batches."""
    normalized_tickers = [t.replace(".", "-") for t in tickers]
    all_dfs = []
    # Download in batches of 10 to be robust
    for i in range(0, len(normalized_tickers), 10):
        subset_tickers = normalized_tickers[i:i+10]
        data = yf.download(
            subset_tickers, 
            start=start_date, 
            end=datetime.now(), 
            group_by="Ticker", 
            auto_adjust=False
        )
        if not data.empty:
            df_subset = fix_yfinance_indexes(data).reset_index()
            all_dfs.append(df_subset)
            
    if not all_dfs:
        return pd.DataFrame()
        
    final_df = pd.concat(all_dfs, ignore_index=True)
    final_df = final_df.dropna(subset=["Open", "High", "Low", "Close", "Volume"], how="all")
    return final_df

def fetch_sp500_index_data(start_date="2003-01-01") -> pd.DataFrame:
    """Fetches historical data for the S&P 500 index (^GSPC)."""
    data = yf.download("^GSPC", start=start_date, end=datetime.now(), auto_adjust=False)
    data=data.droplevel("Ticker",axis=1)
    data.reset_index()
    data.columns.name=(None)

    df = data.reset_index()
    # Rename columns to be specific to the index
    df.columns = [f"sp500_{x}" for x in df.columns.tolist()]
    
    df.rename(columns={"sp500_Date": "Date"}, inplace=True)
    return df

## 3. Feature Engineering Functions
These functions calculate all the technical indicators and date-based features for both individual stocks and the S&P 500 index.

In [13]:
def calc_rsi(x: pd.Series, period: int = 14) -> pd.Series:
    """Helper function to calculate Relative Strength Index (RSI)."""
    change = x.diff()
    gain = change.where(change > 0, 0.0)
    loss = -change.where(change < 0, 0.0)
    avg_gain = gain.ewm(span=period, min_periods=period).mean()
    avg_loss = loss.ewm(span=period, min_periods=period).mean()
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Applies a comprehensive set of feature engineering steps to the stock data."""
    df_copy = df.copy()
    df_copy["Date"] = pd.to_datetime(df_copy["Date"])
    df_copy = df_copy.sort_values(["Ticker", "Date"])
    
    # Returns & Moving Averages
    df_copy["return_1d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(1)
    df_copy["return_5d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(5)
    df_copy["return_20d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(20)
    df_copy["ma_5d"] = df_copy.groupby("Ticker")["Adj Close"].rolling(5).mean().reset_index(0, drop=True)
    df_copy["ma_20d"] = df_copy.groupby("Ticker")["Adj Close"].rolling(20).mean().reset_index(0, drop=True)
    df_copy["ema_5d"] = df_copy.groupby("Ticker")["Adj Close"].transform(lambda x: x.ewm(span=5, adjust=False).mean())
    df_copy["ema_20d"] = df_copy.groupby("Ticker")["Adj Close"].transform(lambda x: x.ewm(span=20, adjust=False).mean())

    # Volatility
    df_copy["rv_5d"] = df_copy.groupby("Ticker")["return_1d"].rolling(5).std().reset_index(0, drop=True)
    df_copy["rv_20d"] = df_copy.groupby("Ticker")["return_1d"].rolling(20).std().reset_index(0, drop=True)
    
    # Date Features
    df_copy["week_of_year"] = df_copy["Date"].dt.isocalendar().week.astype(int)
    df_copy["month"] = df_copy["Date"].dt.month
    df_copy["quarter"] = df_copy["Date"].dt.quarter
    df_copy["day_of_week"] = df_copy["Date"].dt.dayofweek
    df_copy["day_of_month"] = df_copy["Date"].dt.day
    df_copy["is_quarter_start"] = df_copy["Date"].dt.is_quarter_start.astype(int)
    df_copy["is_quarter_end"] = df_copy["Date"].dt.is_quarter_end.astype(int)

    # Price-based Features
    df_copy["hl_range"] = df_copy["High"] - df_copy["Low"]
    df_copy["vol_change"] = df_copy.groupby("Ticker")["Volume"].pct_change()
    df_copy['oc_diff'] = df_copy['Close'] - df_copy['Open']
    df_copy['gap'] = df_copy['Open'] - df_copy.groupby('Ticker')['Close'].shift(1)
    
    # Momentum Indicators (RSI, Bollinger, MACD)
    df_copy["rsi"] = df_copy.groupby("Ticker")["Adj Close"].transform(lambda x: calc_rsi(x, 14))
    b_range = 20
    bollinger_ma = df_copy.groupby("Ticker")["Adj Close"].rolling(b_range).mean().reset_index(0, drop=True)
    bollinger_std = df_copy.groupby("Ticker")["Adj Close"].rolling(b_range).std().reset_index(0, drop=True)
    bollinger_upper = bollinger_ma + 2 * bollinger_std
    bollinger_lower = bollinger_ma - 2 * bollinger_std
    df_copy['bollinger_width'] = (bollinger_upper - bollinger_lower) / bollinger_ma

    exp1 = df_copy.groupby("Ticker")["Adj Close"].transform(lambda x: x.ewm(span=12, adjust=False).mean())
    exp2 = df_copy.groupby("Ticker")["Adj Close"].transform(lambda x: x.ewm(span=26, adjust=False).mean())
    df_copy['macd'] = exp1 - exp2
    df_copy['macd_signal'] = df_copy.groupby("Ticker")['macd'].transform(lambda x: x.ewm(span=9, adjust=False).mean())
    df_copy['macd_hist'] = df_copy['macd'] - df_copy['macd_signal']
    
    df_copy.ta.ad(high=df_copy['High'], low=df_copy['Low'], close=df_copy['Close'], volume=df_copy['Volume'])
    # Additional Technical Indicators using pandas-ta
    df_copy.set_index(pd.MultiIndex.from_frame(df_copy[['Ticker', 'Date']]), inplace=True)
    df_copy.ta.atr(length=14, append=True)
    df_copy.ta.obv(append=True)
    df_copy.ta.stoch(k=14, d=3, append=True)
    df_copy.ta.adx(append=True)
    df_copy.ta.willr(length=14, append=True)
    df_copy.ta.willr(length=5, append=True)
    df_copy.ta.cci(length=20, append=True)
    df_copy.ta.cci(length=14, append=True)
    df_copy.reset_index(inplace=True, drop=True)
    df_copy.rename(columns={'ATRr_14': 'atr_14d', 'OBV': 'obv', 'WILLR_14': 'WR_14', 'WILLR_5': 'WR_5', 'CCI_20_0.015': 'CCI_20', 'CCI_14_0.015': 'CCI_14', 'ADL': 'ADL'}, inplace=True)
    df_copy["atr_14d_norm"] = df_copy["atr_14d"] / df_copy["Adj Close"]

    # Target Variables (Future Returns)
    df_copy["target_1d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(1).shift(-1)
    df_copy["target_5d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(5).shift(-5)
    df_copy["target_20d"] = df_copy.groupby("Ticker")["Adj Close"].pct_change(20).shift(-20)
    
    return df_copy

def engineer_index_features(df: pd.DataFrame) -> pd.DataFrame:
    """Applies feature engineering steps to the S&P 500 index data."""
    df_copy = df.copy()
    df_copy["Date"] = pd.to_datetime(df_copy["Date"])
    df_copy = df_copy.sort_values(["Date"])

    # Returns, MAs, and Volatility
    df_copy["sp500_return_1d"] = df_copy["sp500_Adj Close"].pct_change(1)
    
    df_copy["sp500_return_10d"] = df_copy["sp500_Adj Close"].pct_change(10)
    df_copy["sp500_return_20d"] = df_copy["sp500_Adj Close"].pct_change(20)
    df_copy["sp500_ma_10d"] = df_copy["sp500_Adj Close"].rolling(10).mean()
    df_copy["sp500_ema_20d"] = df_copy["sp500_Adj Close"].ewm(adjust=False, span=20).mean()
    df_copy["sp500_ema_30d"] = df_copy["sp500_Adj Close"].ewm(adjust=False, span=30).mean()
    df_copy["sp500_ema_60d"] = df_copy["sp500_Adj Close"].ewm(adjust=False, span=60).mean()
    df_copy["sp500_ema_90d"] = df_copy["sp500_Adj Close"].ewm(adjust=False, span=90).mean()
    df_copy["sp500_rv_20d"] = df_copy["sp500_return_1d"].rolling(20).std()
    df_copy["sp500_rv_40d"] = df_copy["sp500_return_1d"].rolling(40).std()
    df_copy["sp500_rv_80d"] = df_copy["sp500_return_1d"].rolling(80).std()
    df_copy["sp500_rv_120d"] = df_copy["sp500_return_1d"].rolling(120).std()

    # Price-based and Momentum
    df_copy["sp500_hl_range"] = df_copy["sp500_High"] - df_copy["sp500_Low"]
    df_copy["sp500_vol_change"] = df_copy["sp500_Volume"].pct_change()
    df_copy['sp500_oc_diff'] = df_copy['sp500_Close'] - df_copy['sp500_Open']
    df_copy['sp500_gap'] = df_copy['sp500_Open'] - df_copy['sp500_Close'].shift(1)
    df_copy["sp500_rsi"] = calc_rsi(df_copy["sp500_Adj Close"], 14)
    
    # MACD
    exp1 = df_copy["sp500_Adj Close"].ewm(span=12, adjust=False).mean()
    exp2 = df_copy["sp500_Adj Close"].ewm(span=26, adjust=False).mean()
    sp500_macd = exp1 - exp2
    df_copy['sp500_macd_signal'] = sp500_macd.ewm(span=9, adjust=False).mean()
    df_copy['sp500_macd_hist'] = sp500_macd - df_copy['sp500_macd_signal']
    
    # ATR & OBV for index
    df_copy["sp500_atr_14d"] = ta.atr(high=df_copy["sp500_High"], low=df_copy["sp500_Low"], close=df_copy["sp500_Adj Close"], length=14)
    df_copy["sp500_atr_14d_norm"] = df_copy["sp500_atr_14d"] / df_copy["sp500_Adj Close"]
    df_copy["sp500_obv"] = ta.obv(close=df_copy["sp500_Adj Close"], volume=df_copy["sp500_Volume"])
    
    return df_copy



## 4. Main Execution Pipeline
This section runs the complete data processing workflow.

In [14]:
def main():
    """Main function to execute the data pipeline."""
    # --- Define file paths ---
    output_dir = "../data/processed/"
    company_info_path = "../data/raw/sp500_companies.csv" # Assuming this file exists
    output_path = os.path.join(output_dir, "sp500.parquet")
    os.makedirs(output_dir, exist_ok=True)

    # 1. Fetch S&P 500 Tickers
    print("Fetching S&P 500 tickers...")
    tickers_df = list_slickcharts_sp500()
    tickers = list(tickers_df["Symbol"])
    print(f"Found {len(tickers)} tickers.")

    # 2. Fetch Historical Data for Stocks and Index
    print("Fetching historical stock data...")
    stock_data = fetch_sp500_stock_data(tickers[:10])
    print("Fetching S&P 500 index data...")
    index_data = fetch_sp500_index_data()

    # 3. Load and Merge Company Info
    print("Loading and merging company information...")
    sp500_companies = pd.read_csv(company_info_path)
    sp500_companies.rename(columns={"Symbol": "Ticker"}, inplace=True)
    merged_data = pd.merge(stock_data, sp500_companies[["Ticker", "Sector", "Industry"]], on="Ticker", how="left")
    merged_data['Sector'].fillna('Unknown', inplace=True)
    merged_data['Industry'].fillna('Unknown', inplace=True)

    # 4. Feature Engineering
    print("Engineering features for the S&P 500 index...")
    index_features = engineer_index_features(index_data)
    
    # 5. Merge stock data with index features
    print("Merging stock data with index features...")
    merged_data.reset_index(inplace=True)
    index_features.reset_index(inplace=True)
    merged_data["Date"] = pd.to_datetime(merged_data["Date"])
    index_features["Date"] = pd.to_datetime(index_features["Date"])
    final_df = pd.merge(merged_data, index_features, on="Date", how="left")

    print("Engineering features for individual stocks...")
    final_df = engineer_features(final_df)
    
    # 6. Final Cleaning and Column Selection
    print("Cleaning final dataset and selecting columns...")
    # Replace inf values that may arise from pct_change with 0 or NaN
    
    final_df.replace([np.inf, -np.inf], np.nan, inplace=True)
    final_df.dropna(inplace=True)
    
    # Ensure all required final columns are present
    final_columns = [
        'Date', 'Ticker', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume',
        'Sector', 'Industry', 'return_1d', 'return_5d', 'return_20d', 'ma_5d',
        'ma_20d', 'ema_5d', 'ema_20d', 'rv_5d', 'rv_20d', 'week_of_year',
        'month', 'quarter', 'hl_range', 'vol_change', 'oc_diff', 'gap', 'rsi',
        'macd', 'macd_signal', 'macd_hist', 'target_1d', 'target_5d',
        'target_20d', 'day_of_week', 'day_of_month', 'is_quarter_start',
        'is_quarter_end', 'sp500_Adj Close', 'sp500_Volume', 'sp500_return_10d',
        'sp500_return_20d', 'sp500_ma_10d', 'sp500_ema_20d', 'sp500_rv_20d',
        'sp500_hl_range', 'sp500_vol_change', 'sp500_oc_diff', 'sp500_gap',
        'sp500_rsi', 'sp500_macd_signal', 'sp500_macd_hist', 'bollinger_width',
        'atr_14d', 'atr_14d_norm', 'obv', 'STOCHk_14_3_3', 'STOCHd_14_3_3',
        'STOCHh_14_3_3', 'ADX_14', 'ADXR_14_2', 'DMP_14', 'DMN_14',
        'sp500_ema_30d', 'sp500_ema_60d', 'sp500_ema_90d', 'WR_14', 'WR_5',
        'CCI_20', 'CCI_14', 'ADL', 'sp500_rv_40d', 'sp500_rv_80d','ADL'
        'sp500_rv_120d', 'sp500_atr_14d_norm', 'sp500_obv'
    ]
    # Select only the columns that exist in the dataframe to avoid errors
    existing_final_columns = [col for col in final_columns if col in final_df.columns]
    final_df = final_df[existing_final_columns]
    
    # 7. Save to Parquet
    print(f"Saving final data to {output_path}...")
    final_df.to_parquet(output_path, index=False)
    print("Pipeline finished successfully!")
    print(f"Final DataFrame shape: {final_df.shape}")

# Run the main pipeline
if __name__ == '__main__':
    main()

Fetching S&P 500 tickers...
Found 503 tickers.
Fetching historical stock data...


[*********************100%***********************]  10 of 10 completed


Fetching S&P 500 index data...


[*********************100%***********************]  1 of 1 completed
/var/folders/94/xf2q8_bs543d00kh0st2v5t80000gn/T/ipykernel_7625/4147691023.py:26: ChainedAssignmentError: A value is trying to be set on a copy of a DataFrame or Series through chained assignment using an inplace method.
When using the Copy-on-Write mode, such inplace method never works to update the original DataFrame or Series, because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' instead, to perform the operation inplace on the original object.


  merged_data['Sector'].fillna('Unknown', inplace=True)
/var/folders/94/xf2q8_bs543d00kh0st2v5t80000gn/T/ipykernel_7625/4147691023.py:27: ChainedAssignmentError: A value is trying to be set on a copy of a DataFrame or Series through chained assignment using an inplace method.
When using the Copy-on-Write mode, such inplace met

Loading and merging company information...
Engineering features for the S&P 500 index...
Merging stock data with index features...
Engineering features for individual stocks...
Cleaning final dataset and selecting columns...
Saving final data to ../data/processed/sp500.parquet...
Pipeline finished successfully!
Final DataFrame shape: (49513, 73)
