# Step 6: ML Pipeline

**Objective:** Create a reproducible, end-to-end machine learning pipeline that automates the entire workflow from raw data to predictions.

## Pipeline Components:
1. **Data Ingestion** - Load and validate raw stock data
2. **Feature Engineering** - Transform raw data into ML-ready features
3. **Model Training** - Train models with proper cross-validation
4. **Prediction** - Generate predictions for new data
5. **Pipeline Persistence** - Save/load complete pipeline

## 6.1 Setup and Imports

In [1]:
import pandas as pd
import numpy as np
import json
import joblib
from datetime import datetime, timedelta
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Scikit-learn pipeline components
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import TimeSeriesSplit

# Models
from lightgbm import LGBMClassifier

# Technical indicators
import ta

# Paths
DATA_RAW = Path('../data/raw')
DATA_FEATURES = Path('../data/features')
MODELS_DIR = Path('../models')
PIPELINE_DIR = Path('../pipeline')
PIPELINE_DIR.mkdir(exist_ok=True)

TICKERS = ['AAPL', 'MSFT', 'NVDA', 'TSLA', 'AMZN', 'META', 'GOOGL']

print('‚úÖ Libraries loaded')
print(f'üìÅ Pipeline directory: {PIPELINE_DIR.absolute()}')

‚úÖ Libraries loaded
üìÅ Pipeline directory: c:\git\Data project\notebooks\..\pipeline


## 6.2 Custom Transformers

In [2]:
class TechnicalIndicatorTransformer(BaseEstimator, TransformerMixin):
    """
    Custom transformer to add technical indicators to stock data.
    """
    def __init__(self):
        self.feature_names_ = None
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        # Ensure required columns exist
        required = ['open', 'high', 'low', 'close', 'volume']
        if not all(col in df.columns for col in required):
            raise ValueError(f"Missing required columns: {required}")
        
        # Trend Indicators
        df['SMA_10'] = ta.trend.sma_indicator(df['close'], window=10)
        df['SMA_20'] = ta.trend.sma_indicator(df['close'], window=20)
        df['SMA_50'] = ta.trend.sma_indicator(df['close'], window=50)
        df['EMA_10'] = ta.trend.ema_indicator(df['close'], window=10)
        df['EMA_20'] = ta.trend.ema_indicator(df['close'], window=20)
        
        # MACD
        macd = ta.trend.MACD(df['close'])
        df['MACD'] = macd.macd()
        df['MACD_Signal'] = macd.macd_signal()
        df['MACD_Hist'] = macd.macd_diff()
        
        # ADX
        adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close'])
        df['ADX'] = adx.adx()
        df['ADX_Pos'] = adx.adx_pos()
        df['ADX_Neg'] = adx.adx_neg()
        
        # Momentum Indicators
        df['RSI'] = ta.momentum.rsi(df['close'], window=14)
        df['RSI_Fast'] = ta.momentum.rsi(df['close'], window=7)
        
        stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close'])
        df['Stoch_K'] = stoch.stoch()
        df['Stoch_D'] = stoch.stoch_signal()
        
        df['ROC_5'] = ta.momentum.roc(df['close'], window=5)
        df['ROC_10'] = ta.momentum.roc(df['close'], window=10)
        
        # Volatility Indicators
        bb = ta.volatility.BollingerBands(df['close'])
        df['BB_High'] = bb.bollinger_hband()
        df['BB_Low'] = bb.bollinger_lband()
        df['BB_Mid'] = bb.bollinger_mavg()
        df['BB_Width'] = (df['BB_High'] - df['BB_Low']) / df['BB_Mid']
        df['BB_Pct'] = bb.bollinger_pband()
        
        df['ATR'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'])
        
        # Volume Indicators
        df['OBV'] = ta.volume.on_balance_volume(df['close'], df['volume'])
        df['Volume_SMA_20'] = df['volume'].rolling(20).mean()
        df['Volume_Ratio'] = df['volume'] / df['Volume_SMA_20']
        
        self.feature_names_ = df.columns.tolist()
        return df
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_

print('‚úÖ TechnicalIndicatorTransformer defined')

‚úÖ TechnicalIndicatorTransformer defined


In [3]:
class LagFeatureTransformer(BaseEstimator, TransformerMixin):
    """
    Add lag features for specified columns.
    """
    def __init__(self, columns=None, lags=[1, 2, 3, 5, 10, 20]):
        self.columns = columns or ['Return', 'RSI', 'Volume_Ratio']
        self.lags = lags
        self.feature_names_ = None
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        # Calculate Return if not present
        if 'Return' not in df.columns and 'close' in df.columns:
            df['Return'] = df['close'].pct_change()
        
        # Add lag features
        for col in self.columns:
            if col in df.columns:
                for lag in self.lags:
                    df[f'{col}_Lag_{lag}'] = df[col].shift(lag)
        
        self.feature_names_ = df.columns.tolist()
        return df
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_

print('‚úÖ LagFeatureTransformer defined')

‚úÖ LagFeatureTransformer defined


In [4]:
class RollingStatsTransformer(BaseEstimator, TransformerMixin):
    """
    Add rolling statistics features.
    """
    def __init__(self, windows=[5, 10, 20]):
        self.windows = windows
        self.feature_names_ = None
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        if 'Return' not in df.columns and 'close' in df.columns:
            df['Return'] = df['close'].pct_change()
        
        for window in self.windows:
            # Return statistics
            if 'Return' in df.columns:
                df[f'Return_Mean_{window}D'] = df['Return'].rolling(window).mean()
                df[f'Return_Std_{window}D'] = df['Return'].rolling(window).std()
                df[f'Sharpe_{window}D'] = df[f'Return_Mean_{window}D'] / (df[f'Return_Std_{window}D'] + 1e-10)
            
            # Price statistics
            if 'close' in df.columns:
                df[f'Volatility_{window}D'] = df['close'].pct_change().rolling(window).std() * np.sqrt(252)
                df[f'Price_Range_{window}D'] = (df['high'].rolling(window).max() - df['low'].rolling(window).min()) / df['close']
        
        self.feature_names_ = df.columns.tolist()
        return df
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_

print('‚úÖ RollingStatsTransformer defined')

‚úÖ RollingStatsTransformer defined


In [5]:
class TargetTransformer(BaseEstimator, TransformerMixin):
    """
    Create target variable for direction prediction.
    """
    def __init__(self, horizon=1):
        self.horizon = horizon
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        if 'close' in df.columns:
            # Future return
            df['Future_Return'] = df['close'].pct_change(self.horizon).shift(-self.horizon)
            # Direction: 1 if up, 0 if down
            df['Target_Direction'] = (df['Future_Return'] > 0).astype(int)
        
        return df

print('‚úÖ TargetTransformer defined')

‚úÖ TargetTransformer defined


In [6]:
class DataCleaner(BaseEstimator, TransformerMixin):
    """
    Clean data by removing NaN values and selecting feature columns.
    """
    def __init__(self, feature_cols=None, drop_na=True):
        self.feature_cols = feature_cols
        self.drop_na = drop_na
        self.fitted_cols_ = None
        
    def fit(self, X, y=None):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        if self.feature_cols is None:
            # Auto-detect numeric columns, exclude target and date columns
            exclude = ['Target_Direction', 'Future_Return', 'date', 'Date', 'ticker', 'Ticker']
            self.fitted_cols_ = [col for col in df.select_dtypes(include=[np.number]).columns 
                                 if col not in exclude]
        else:
            self.fitted_cols_ = self.feature_cols
        
        return self
    
    def transform(self, X):
        df = X.copy() if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
        
        # Select feature columns that exist
        available_cols = [col for col in self.fitted_cols_ if col in df.columns]
        
        if self.drop_na:
            df = df.dropna(subset=available_cols)
        
        return df

print('‚úÖ DataCleaner defined')

‚úÖ DataCleaner defined


## 6.3 Complete Stock Prediction Pipeline

In [7]:
class StockPredictionPipeline:
    """
    End-to-end pipeline for stock direction prediction.
    """
    
    def __init__(self, ticker, model=None):
        self.ticker = ticker
        self.model = model or LGBMClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=5,
            random_state=42,
            verbosity=-1
        )
        self.scaler = StandardScaler()
        self.feature_cols = None
        self.is_fitted = False
        
        # Transformers
        self.tech_transformer = TechnicalIndicatorTransformer()
        self.lag_transformer = LagFeatureTransformer(
            columns=['Return', 'RSI', 'Volume_Ratio'],
            lags=[1, 2, 3, 5, 10, 20]
        )
        self.rolling_transformer = RollingStatsTransformer(windows=[5, 10, 20])
        self.target_transformer = TargetTransformer(horizon=1)
        
    def _engineer_features(self, df):
        """Apply all feature engineering transformations."""
        df = self.tech_transformer.transform(df)
        df = self.lag_transformer.transform(df)
        df = self.rolling_transformer.transform(df)
        df = self.target_transformer.transform(df)
        return df
    
    def _prepare_features(self, df):
        """Prepare feature matrix X and target y."""
        # Define feature columns (exclude non-feature columns)
        exclude_cols = ['open', 'high', 'low', 'close', 'volume', 'adj close',
                       'Target_Direction', 'Future_Return', 'date', 'Date', 'ticker', 'Ticker']
        
        feature_cols = [col for col in df.select_dtypes(include=[np.number]).columns
                       if col not in exclude_cols]
        
        return feature_cols
    
    def fit(self, df, verbose=True):
        """
        Fit the complete pipeline on training data.
        
        Parameters:
        -----------
        df : pd.DataFrame
            Raw stock data with OHLCV columns
        """
        if verbose:
            print(f"üîß Fitting pipeline for {self.ticker}...")
        
        # 1. Feature engineering
        df_features = self._engineer_features(df.copy())
        
        # 2. Get feature columns
        self.feature_cols = self._prepare_features(df_features)
        
        # 3. Drop NaN rows
        df_clean = df_features.dropna(subset=self.feature_cols + ['Target_Direction'])
        
        if verbose:
            print(f"   Features: {len(self.feature_cols)}")
            print(f"   Samples: {len(df_clean)}")
        
        # 4. Prepare X and y
        X = df_clean[self.feature_cols].values
        y = df_clean['Target_Direction'].values
        
        # 5. Scale features
        X_scaled = self.scaler.fit_transform(X)
        
        # 6. Train model
        self.model.fit(X_scaled, y)
        
        self.is_fitted = True
        
        if verbose:
            print(f"‚úÖ Pipeline fitted for {self.ticker}")
        
        return self
    
    def predict(self, df):
        """
        Generate predictions for new data.
        
        Returns:
        --------
        pd.DataFrame with predictions
        """
        if not self.is_fitted:
            raise ValueError("Pipeline not fitted. Call fit() first.")
        
        # Feature engineering
        df_features = self._engineer_features(df.copy())
        
        # Get available feature columns
        available_cols = [col for col in self.feature_cols if col in df_features.columns]
        
        # Handle missing columns
        for col in self.feature_cols:
            if col not in df_features.columns:
                df_features[col] = 0
        
        # Prepare features
        X = df_features[self.feature_cols].values
        
        # Handle NaN for prediction
        X = np.nan_to_num(X, nan=0)
        
        # Scale
        X_scaled = self.scaler.transform(X)
        
        # Predict
        predictions = self.model.predict(X_scaled)
        probabilities = self.model.predict_proba(X_scaled)[:, 1]
        
        # Create result DataFrame
        result = df_features[['close']].copy()
        result['Prediction'] = predictions
        result['Probability_Up'] = probabilities
        result['Signal'] = np.where(probabilities > 0.5, 'BUY', 'SELL')
        
        return result
    
    def save(self, path):
        """Save pipeline to disk."""
        path = Path(path)
        path.mkdir(parents=True, exist_ok=True)
        
        # Save components
        joblib.dump(self.model, path / f'{self.ticker}_model.pkl')
        joblib.dump(self.scaler, path / f'{self.ticker}_scaler.pkl')
        
        # Save feature columns
        with open(path / f'{self.ticker}_features.json', 'w') as f:
            json.dump(self.feature_cols, f)
        
        # Save metadata
        metadata = {
            'ticker': self.ticker,
            'n_features': len(self.feature_cols),
            'is_fitted': self.is_fitted,
            'saved_at': datetime.now().isoformat()
        }
        with open(path / f'{self.ticker}_metadata.json', 'w') as f:
            json.dump(metadata, f, indent=2)
        
        print(f"üíæ Pipeline saved: {path}")
    
    @classmethod
    def load(cls, path, ticker):
        """Load pipeline from disk."""
        path = Path(path)
        
        pipeline = cls(ticker)
        pipeline.model = joblib.load(path / f'{ticker}_model.pkl')
        pipeline.scaler = joblib.load(path / f'{ticker}_scaler.pkl')
        
        with open(path / f'{ticker}_features.json', 'r') as f:
            pipeline.feature_cols = json.load(f)
        
        pipeline.is_fitted = True
        
        print(f"üìÇ Pipeline loaded: {ticker}")
        return pipeline

print('‚úÖ StockPredictionPipeline class defined')

‚úÖ StockPredictionPipeline class defined


## 6.4 Train and Save Pipelines

In [9]:
# Train pipelines for all stocks
pipelines = {}

print('=' * 60)
print('üöÄ TRAINING PIPELINES FOR ALL MAGNIFICENT 7 STOCKS')
print('=' * 60)

for ticker in TICKERS:
    # Load raw data (files have _raw.csv suffix)
    file_path = DATA_RAW / f'{ticker}_raw.csv'
    df = pd.read_csv(file_path, index_col=0, parse_dates=True)
    df.columns = df.columns.str.lower()
    
    # Create and fit pipeline
    pipeline = StockPredictionPipeline(ticker)
    pipeline.fit(df)
    
    # Save pipeline
    pipeline.save(PIPELINE_DIR / ticker)
    
    pipelines[ticker] = pipeline
    print()

print('=' * 60)
print('‚úÖ All pipelines trained and saved!')
print('=' * 60)

üöÄ TRAINING PIPELINES FOR ALL MAGNIFICENT 7 STOCKS
üîß Fitting pipeline for AAPL...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for AAPL
üíæ Pipeline saved: ..\pipeline\AAPL

üîß Fitting pipeline for MSFT...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for MSFT
üíæ Pipeline saved: ..\pipeline\MSFT

üîß Fitting pipeline for NVDA...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for NVDA
üíæ Pipeline saved: ..\pipeline\NVDA

üîß Fitting pipeline for TSLA...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for TSLA
üíæ Pipeline saved: ..\pipeline\TSLA

üîß Fitting pipeline for AMZN...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for AMZN
üíæ Pipeline saved: ..\pipeline\AMZN

üîß Fitting pipeline for META...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for META
üíæ Pipeline saved: ..\pipeline\META

üîß Fitting pipeline for GOOGL...
   Features: 62
   Samples: 1972
‚úÖ Pipeline fitted for GOOGL
üíæ Pipeline saved: ..\pipeline\GOOGL


## 6.5 Test Pipeline Predictions

In [10]:
# Test predictions with most recent data
print('=' * 60)
print('üìä TESTING PIPELINE PREDICTIONS')
print('=' * 60)

for ticker in TICKERS[:3]:  # Test first 3
    # Load raw data
    file_path = DATA_RAW / f'{ticker}_raw.csv'
    df = pd.read_csv(file_path, index_col=0, parse_dates=True)
    df.columns = df.columns.str.lower()
    
    # Get predictions
    predictions = pipelines[ticker].predict(df)
    
    print(f"\n{ticker} - Last 5 Predictions:")
    print(predictions[['close', 'Probability_Up', 'Signal']].tail())
    
    # Signal distribution
    signal_counts = predictions['Signal'].value_counts()
    print(f"Signal Distribution: {dict(signal_counts)}")

üìä TESTING PIPELINE PREDICTIONS

AAPL - Last 5 Predictions:
                 close  Probability_Up Signal
Date                                         
2026-01-09  259.369995        0.807934    BUY
2026-01-12  260.250000        0.666508    BUY
2026-01-13  261.049988        0.295457   SELL
2026-01-14  259.959991        0.142489   SELL
2026-01-15  258.209991        0.161919   SELL
Signal Distribution: {'BUY': np.int64(1161), 'SELL': np.int64(860)}

MSFT - Last 5 Predictions:
                 close  Probability_Up Signal
Date                                         
2026-01-09  479.279999        0.242528   SELL
2026-01-12  477.179993        0.233417   SELL
2026-01-13  470.670013        0.349091   SELL
2026-01-14  459.380005        0.430727   SELL
2026-01-15  456.660004        0.427669   SELL
Signal Distribution: {'BUY': np.int64(1130), 'SELL': np.int64(891)}

NVDA - Last 5 Predictions:
                 close  Probability_Up Signal
Date                                         
2026-01-09

In [11]:
# Test loading pipeline from disk
print('=' * 60)
print('üîÑ TESTING PIPELINE LOADING')
print('=' * 60)

# Load a saved pipeline
loaded_pipeline = StockPredictionPipeline.load(PIPELINE_DIR / 'AAPL', 'AAPL')

# Test prediction with loaded pipeline
df_test = pd.read_csv(DATA_RAW / 'AAPL_raw.csv', index_col=0, parse_dates=True)
df_test.columns = df_test.columns.str.lower()

predictions = loaded_pipeline.predict(df_test)
print(f"\nLoaded AAPL Pipeline - Last 5 Predictions:")
print(predictions[['close', 'Probability_Up', 'Signal']].tail())

üîÑ TESTING PIPELINE LOADING
üìÇ Pipeline loaded: AAPL

Loaded AAPL Pipeline - Last 5 Predictions:
                 close  Probability_Up Signal
Date                                         
2026-01-09  259.369995        0.807934    BUY
2026-01-12  260.250000        0.666508    BUY
2026-01-13  261.049988        0.295457   SELL
2026-01-14  259.959991        0.142489   SELL
2026-01-15  258.209991        0.161919   SELL


## 6.6 Batch Prediction Function

In [12]:
def get_all_predictions(pipeline_dir, data_dir, tickers):
    """
    Generate predictions for all stocks.
    
    Returns:
    --------
    pd.DataFrame with latest predictions for all stocks
    """
    all_predictions = []
    
    for ticker in tickers:
        # Load pipeline
        pipeline = StockPredictionPipeline.load(pipeline_dir / ticker, ticker)
        
        # Load data
        df = pd.read_csv(data_dir / f'{ticker}_raw.csv', index_col=0, parse_dates=True)
        df.columns = df.columns.str.lower()
        
        # Get predictions
        pred = pipeline.predict(df)
        
        # Get latest prediction
        latest = pred.iloc[-1].copy()
        latest['Ticker'] = ticker
        latest['Date'] = pred.index[-1]
        
        all_predictions.append(latest)
    
    result = pd.DataFrame(all_predictions)
    result = result[['Ticker', 'Date', 'close', 'Probability_Up', 'Signal']]
    result.columns = ['Ticker', 'Date', 'Close', 'Prob_Up', 'Signal']
    
    return result

# Get all predictions
print('=' * 60)
print('üìä LATEST PREDICTIONS FOR ALL MAGNIFICENT 7')
print('=' * 60)

all_preds = get_all_predictions(PIPELINE_DIR, DATA_RAW, TICKERS)
print()
print(all_preds.to_string(index=False))

üìä LATEST PREDICTIONS FOR ALL MAGNIFICENT 7
üìÇ Pipeline loaded: AAPL
üìÇ Pipeline loaded: MSFT
üìÇ Pipeline loaded: NVDA
üìÇ Pipeline loaded: TSLA
üìÇ Pipeline loaded: AMZN
üìÇ Pipeline loaded: META
üìÇ Pipeline loaded: GOOGL

Ticker       Date      Close  Prob_Up Signal
  AAPL 2026-01-15 258.209991 0.161919   SELL
  MSFT 2026-01-15 456.660004 0.427669   SELL
  NVDA 2026-01-15 187.050003 0.326198   SELL
  TSLA 2026-01-15 438.570007 0.245648   SELL
  AMZN 2026-01-15 238.179993 0.178959   SELL
  META 2026-01-15 620.799988 0.293636   SELL
 GOOGL 2026-01-15 332.779999 0.417860   SELL


## 6.7 Pipeline Summary

In [13]:
# Summary
print('=' * 70)
print('üè≠ PIPELINE SUMMARY')
print('=' * 70)

print("\nüì¶ PIPELINE COMPONENTS:")
print("   1. TechnicalIndicatorTransformer - Adds 25+ technical indicators")
print("   2. LagFeatureTransformer - Creates lag features for key indicators")
print("   3. RollingStatsTransformer - Adds rolling statistics")
print("   4. TargetTransformer - Creates direction target variable")
print("   5. StandardScaler - Normalizes features")
print("   6. LGBMClassifier - Predicts direction")

print("\nüìÅ SAVED FILES:")
for ticker in TICKERS:
    ticker_dir = PIPELINE_DIR / ticker
    if ticker_dir.exists():
        files = list(ticker_dir.glob('*'))
        print(f"   {ticker}/: {len(files)} files")

print("\nüîß PIPELINE USAGE:")
print("   # Load pipeline")
print("   pipeline = StockPredictionPipeline.load(path, ticker)")
print("   ")
print("   # Get predictions")
print("   predictions = pipeline.predict(new_data)")

print("\n" + '=' * 70)
print('‚úÖ STEP 6 COMPLETE - Ready for 07_validation.ipynb')
print('=' * 70)

üè≠ PIPELINE SUMMARY

üì¶ PIPELINE COMPONENTS:
   1. TechnicalIndicatorTransformer - Adds 25+ technical indicators
   2. LagFeatureTransformer - Creates lag features for key indicators
   3. RollingStatsTransformer - Adds rolling statistics
   4. TargetTransformer - Creates direction target variable
   5. StandardScaler - Normalizes features
   6. LGBMClassifier - Predicts direction

üìÅ SAVED FILES:
   AAPL/: 4 files
   MSFT/: 4 files
   NVDA/: 4 files
   TSLA/: 4 files
   AMZN/: 4 files
   META/: 4 files
   GOOGL/: 4 files

üîß PIPELINE USAGE:
   # Load pipeline
   pipeline = StockPredictionPipeline.load(path, ticker)
   
   # Get predictions
   predictions = pipeline.predict(new_data)

‚úÖ STEP 6 COMPLETE - Ready for 07_validation.ipynb
