In [None]:
import pandas as pd
import os
import pandas_ta as ta  # Technical indicators
from tqdm import tqdm   # Progress bars
import numpy as np

# Configuration
RAW_DIR = "../data/raw/yfinance"
PROCESSED_DIR = "../data/processed"
print("Files in yfinance folder:", os.listdir(RAW_DIR))


Bulk Load All YFinance Files


In [None]:
def load_all_yfinance():
    """Load and combine multiple YFinance stock data CSVs into one DataFrame.
    Handles metadata rows, standardizes columns, and ensures date continuity."""
    
    all_dfs = []  # Will store individual stock DataFrames before concatenation
    
    # Process each CSV file in directory (with progress bar)
    for file in tqdm(os.listdir(RAW_DIR)):
        if file.endswith('.csv'):
            try:
                # Extract ticker symbol from filename (e.g. 'AAPL.csv' -> 'AAPL')
                ticker = file.split('.')[0]
                
                # Read CSV with specific formatting:
                # - Skip 3 metadata rows
                # - Standardize column names
                # - Parse dates as datetime objects
                df = pd.read_csv(
                    f"{RAW_DIR}/{file}",
                    skiprows=3,
                    names=['date', 'close', 'high', 'low', 'open', 'volume'],
                    parse_dates=['date']
                )
                
                # Add ticker column to identify the stock
                df['ticker'] = ticker
                
                # Ensure continuous daily data:
                # 1. Set date as index
                # 2. Force daily frequency (inserts NaNs for missing dates)
                # 3. Forward-fill missing values
                # 4. Reset index to return date to a column
                df = df.set_index('date').asfreq('D').ffill().reset_index()
                
                all_dfs.append(df)
                
            except Exception as e:
                # Skip problematic files but log the error
                print(f"⚠️ Failed {file}: {str(e)}")
                continue
                
    # Combine all DataFrames, ignoring original indices
    return pd.concat(all_dfs, ignore_index=True)

# Execute and verify load
df = load_all_yfinance()
print(f"✅ Loaded {len(df)} rows from {df['ticker'].nunique()} stocks")

code verification 

In [None]:
print(df[df['ticker'] == 'AAPL'].head(10))
unique_tickers = df['ticker'].unique().tolist()
print(f"Tickers: {unique_tickers[:5]}...")  

Data Cleaning Pipeline
Goal: Fix common data quality issues

Why?
Ensures dates are recognized as timestamps (not strings)
, Handles market closures without leaving gaps
, Guarantees no NaN values break your models



In [None]:
def clean_data(df):
    # Convert text dates to proper datetime format (essential for time series)
    df['date'] = pd.to_datetime(df['date'])  
    
    # Forward-fill missing values (e.g., weekends/holidays when markets are closed)
    df = df.sort_values(['ticker', 'date'])
    df = df.groupby('ticker').apply(lambda x: x.ffill())  # Carry last known value forward
    
    # Remove any remaining bad rows
    return df.dropna()

In [None]:
# Total NaN values in entire DataFrame
total_nans = df.isna().sum().sum()
print(f"Total NaN values: {total_nans}")

# NaN count per column
nan_per_column = df.isna().sum()
print("\nNaN per column:")
print(nan_per_column)
print(df.shape)
print(df.head(10))

Feature Engineering
Goal: Add technical indicators traders use

Why These Indicators?

SMA: Smooths price noise to reveal trends

RSI: Identifies potential reversals (values >70 = overbought, <30 = oversold)

MACD: Shows momentum shifts

Bollinger Bands: Highlights volatility extremes

In [None]:
def add_technical_indicators(df):
    """Calculate technical indicators for each stock in the DataFrame.
    
    Applies common trading indicators to each stock's price series independently
    to avoid cross-contamination between different securities.
    
    Args:
        df: DataFrame containing stock data with columns: ['ticker', 'date', 'close', ...]
        
    Returns:
        DataFrame with additional technical indicator columns:
        - sma_20: 20-day Simple Moving Average (trend identification)
        - rsi_14: 14-day Relative Strength Index (momentum)
        - macd: MACD line (12,26,9 EMAs crossover system)
        - boll_high: Upper Bollinger Band (2 std dev above SMA)
        - boll_low: Lower Bollinger Band (2 std dev below SMA)
    """
    
    # Group by ticker to ensure calculations are performed per-stock
    # Using apply(lambda) pattern for clean column-wise operations
    return df.groupby('ticker').apply(lambda x: x.assign(
        # Trend indicator: 20-period Simple Moving Average
        # Helps identify the prevailing market direction
        sma_20=ta.sma(x['close'], 20),
        
        # Momentum oscillator: 14-period RSI
        # Measures speed of price movements (range: 0-100)
        # >70 = overbought, <30 = oversold
        rsi_14=ta.rsi(x['close'], 14),
        
        # MACD Line (12,26,9 EMA configuration)
        # Shows relationship between two moving averages
        # Positive = upward momentum, Negative = downward momentum
        macd=ta.macd(x['close'])['MACD_12_26_9'],
        
        # Bollinger Bands (20MA ± 2 standard deviations)
        # Identifies overextended price moves
        # Prices near upper band = potentially overbought
        # Prices near lower band = potentially oversold
        boll_high=ta.bbands(x['close'])['BBU_5_2.0'],
        boll_low=ta.bbands(x['close'])['BBL_5_2.0']
    ))

In [None]:
df_features = add_technical_indicators(df)
# Check new columns
print("New columns:", df_features.columns.tolist())
# See sample data for AAPL
print("\nAAPL data with indicators:")
print(df_features[df_features['ticker'] == 'AAPL'].tail(3))

In [None]:
# Forward-fill missing values (carry last known observation forward)
# NOTE: This maintains continuous time series but WON'T fill:
# 1. Leading NaNs (where no prior value exists to fill from)
# 2. Technical indicator warm-up periods (e.g., first 19 rows of SMA20)
# For complete NaN removal, use .dropna() after this
df_features = df_features.ffill()

# drop early rows with NaN
df_features = df_features.dropna()

print("Remaining NaNs:", df_features.isna().sum().sum())  # Should be 0
print(df_features.head())

Check if any ticker has date gaps


In [None]:
date_gaps = df.groupby('ticker')['date'].apply(lambda x: x.sort_values().diff().max())
print("Max gap between dates per ticker (should be <=3 days for weekends):")
print(date_gaps.dt.days.value_counts())

Check for duplicate dates per ticker


In [None]:
duplicate_dates = df.groupby(['ticker', 'date']).size().loc[lambda x: x > 1]
if not duplicate_dates.empty:
    print("⚠️ Duplicate dates found:", duplicate_dates.index.tolist())

# Verify volume > 0 for all rows
zero_volume = df[df['volume'] <= 0]
if not zero_volume.empty:
    print("⚠️ Stocks with zero volume:", zero_volume['ticker'].unique())

Final Data Quality Checks

In [None]:
# Key Tasks:
# 1. Resolve data structure issues
# 2. Validate data quality
# 3. Calculate technical indicators
# 4. Handle missing data appropriately

# 1. CREATE CLEAN WORKING COPY
# Always work on copies to prevent accidental mutation of source data
# Note: This breaks reference chains that can cause SettingWithCopy warnings
df_features = df_features.copy()

# 2. FIX INDEX/COLUMN CONFLICTS
# Common issue after groupby operations - ticker ends up in both index and columns
if 'ticker' in df_features.index.names:
    # Save the index values before resetting
    ticker_values = df_features.index.get_level_values('ticker')
    
    # Clean slate - remove all index levels
    # drop=True prevents old index from becoming new column
    df_features = df_features.reset_index(drop=True)
    
    # Only add ticker back if it's not already a column
    # Prevents duplicate columns which break pandas operations
    if 'ticker' not in df_features.columns:
        df_features['ticker'] = ticker_values

# 3. DATA QUALITY CHECKS
print("\n🔍 Running Data Quality Checks...")

# Check 1: Negative Prices (impossible in real markets)
# Important because some indicators break with negative values
negative_prices = df_features[
    (df_features['close'] <= 0) | 
    (df_features['open'] <= 0) |
    (df_features['high'] <= 0) |
    (df_features['low'] <= 0)
]
if not negative_prices.empty:
    print("⚠️ Negative prices found in:", negative_prices['ticker'].unique())
    # Remove corrupt rows completely - can't fix bad prices
    df_features = df_features[~df_features.index.isin(negative_prices.index)]

# Check 2: Price Relationship Sanity
# Ensures high > low, high >= close, etc. (market mechanics)
invalid_prices = df_features[
    (df_features['high'] < df_features['low']) |
    (df_features['high'] < df_features['close']) |
    (df_features['low'] > df_features['open'])
]
if not invalid_prices.empty:
    print("⚠️ Illogical prices in:", invalid_prices['ticker'].unique())
    # Drop invalid rows - likely data corruption
    df_features = df_features[~df_features.index.isin(invalid_prices.index)]

# 4. FEATURE ENGINEERING (WITH NaN PROTECTION)
print("\n⚙️ Calculating Features with Safe NaN Handling...")

# Volume Analysis - helps detect unusual activity
# Using min_periods=1 means we get values even with limited history
df_features['volume_ma_20'] = df_features.groupby('ticker')['volume'].transform(
    lambda x: x.rolling(20, min_periods=1).mean()
)
# Volume spike ratio (safe division)
df_features['volume_spike'] = np.where(
    df_features['volume_ma_20'] > 0,  # Prevent divide-by-zero
    df_features['volume'] / df_features['volume_ma_20'],
    1.0  # Neutral value when no history exists
)

# Technical Indicators Wrapper
# Group-level calculation with error handling
def safe_technical(group):
    """Calculate indicators for a single stock with error protection"""
    try:
        # Trend indicators
        group['sma_20'] = ta.sma(group['close'], length=20)  # 20-day moving average
        # Momentum indicators
        group['rsi_14'] = ta.rsi(group['close'], length=14)  # Relative Strength Index
        # Volatility indicator
        group['atr_14'] = ta.atr(group['high'], group['low'], group['close'], length=14)
        return group
    except Exception as e:
        # Log errors but keep processing other stocks
        print(f"⚠️ Error calculating indicators for {group.name}: {str(e)}")
        return group

# Apply to all stocks in parallel
df_features = df_features.groupby('ticker', group_keys=False).apply(safe_technical)

# 5. NaN MANAGEMENT STRATEGY
print("\n🧹 Final NaN Cleanup...")

# Technical indicators create NaNs during warm-up periods:
# - SMA20: First 19 days
# - RSI14: First 13 days  
# Strategy: Forward fill within each stock's data
tech_cols = ['sma_20', 'rsi_14', 'atr_14']
for col in tech_cols:
    if col in df_features.columns:
        # Fill NaNs with last valid observation per stock
        df_features[col] = df_features.groupby('ticker')[col].ffill()

# Final NaN treatment - domain-specific fallbacks
fill_values = {
    'sma_20': df_features['close'],  # If no SMA, use raw price
    'rsi_14': 50,                   # Neutral RSI level
    'atr_14': 0,                     # Assume no volatility
    'volume_spike': 1                # No spike
}
for col, val in fill_values.items():
    if col in df_features.columns:
        df_features[col] = df_features[col].fillna(val)

# 6. FINAL VALIDATION
print("\n✅ Final Validation")
# Hard checks - fail fast if data isn't clean
assert df_features.isna().sum().sum() == 0, (
    f"Final NaN count: {df_features.isna().sum().sum()}\n"
    f"NaN columns: {df_features.columns[df_features.isna().any()].tolist()}"
)
assert (df_features['close'] > 0).all(), "Negative prices exist!"

# Ensure chronological order for time series analysis
df_features.sort_values(['ticker', 'date'], inplace=True)

print(f"\n🎉 Cleaning complete! Final shape: {df_features.shape}")
print("Available columns:", df_features.columns.tolist())

MARKET DAY FLAG


In [None]:
# ------------------
# Creates boolean flag for trading vs non-trading days
# Critical for: 
# - Avoiding false signals from weekend/holiday data
# - Volume spike analysis
# Note: Uses volume > 0 as proxy for market open (more reliable than calendar)
df_features['is_market_open'] = df_features['volume'] > 0

# 2. PRICE CHANGE ANALYSIS
# ------------------------
# Calculates daily returns with protection against:
# - Division by zero
# - Look-ahead bias
# Groups by ticker to prevent cross-stock contamination
df_features['daily_return'] = df_features.groupby('ticker')['close'].pct_change()

# Extreme move detection (50% daily change)
# Important for:
# - Identifying potential data errors
# - Flagging corporate actions (splits, dividends)
# - Risk management scenarios
extreme_returns = df_features[np.abs(df_features['daily_return']) > 0.5]
if not extreme_returns.empty:
    # Log but don't fail - some stocks legitimately gap
    print(f"⚠️ {len(extreme_returns)} extreme returns (>50%) detected")
    print("Sample affected tickers:", extreme_returns['ticker'].unique()[:5])
    


print(df_features.head(10))

# 3. DATA PERSISTENCE
# -------------------
# Saves cleaned data in Parquet format because:
# - Preserves dtypes (no float→string conversion like CSV)
# - Efficient columnar storage (faster for time series)
# - Built-in compression (gzip offers good balance)
# - Maintains schema on read
df_features.to_parquet(
    f"{PROCESSED_DIR}/cleaned_stocks.parquet",  # Versioned path recommended
    index=False,  # Don't persist index (redundant with ticker+date)
    engine='pyarrow',  # More reliable than fastparquet
    compression='gzip',  # ~60-70% size reduction
)

# Post-save verification
saved_size = os.path.getsize(f"{PROCESSED_DIR}/cleaned_stocks.parquet")/1e6
print(f"✅ Saved cleaned data ({saved_size:.1f}MB)")
print(f"Columns persisted: {df_features.columns.tolist()}")