In [None]:
!pip install numpy pandas pandas_datareader matplotlib yfinance scikit-learn xgboost tensorflow

## Imports


In [2]:
# stat data analyzing libraries
import numpy as np
import pandas as pd
from pandas_datareader import data as pdata
from datetime import datetime as dtim, timedelta as tdel

# projecting data libraries
import matplotlib.pyplot as plt

# financial data libraries
import yfinance as yf

# ML libraries
import mlflow as mlf
import xgboost as xgb
from sklearn.preprocessing import StandardScaler as ssc
from sklearn.metrics import (
    mean_squared_error as mse,
    mean_absolute_error as mae,
    r2_score as r2s,
)
from tensorflow.keras.models import Sequential as seqMD
from tensorflow.keras.layers import LSTM, Dense, Dropout as dpMD

# other miscellaneous libraries
import json
import joblib

# Data Intake & Refine Pipeline


In [3]:
class DataPipeline:

    # Constructor for pipeline.
    def __init__(self, asset, trading_day_window=30, active_total_trading_days=252):
        self.asset = asset
        self.trading_window = trading_day_window
        self.active_days = active_total_trading_days
        self.scaling_factor = np.sqrt(active_total_trading_days)
        self.dataframe = None
        self.features = None
        self.scaled_features = None

    ## Fetching past for the asset for this interval of time.
    def fetch_financial_data(self, start_date, end_date):

        # this dataframe holds OHLCV data. (Open, HIgh, Low, Close, Volume).
        dataframe = yf.download(tickers=self.asset, start=start_date, end=end_date)

        ## Calculating returns.
        dataframe["Returns"] = np.log(dataframe["Close"] / dataframe["Close"].shift(1))

        ## Returning the dataframe with returns calculated.
        self.dataframe = dataframe
        return dataframe

    ## Calculating Realized Volatility -> Standard Deviation of Returns from the Mean Return.
    def calculate_realized_volatility(self):
        # taken a month by default can change it accordingly.

        ## Calculating the Annual Volatility using the Rolling Window Standard Deviation and Scaling it.
        self.dataframe["RealizedVolatility"] = (
            self.dataframe["Returns"].rolling(window=self.trading_window).std()
        ) * (self.scaling_factor)

        ## Calculating High-Low Volatility
        self.dataframe["HighLowVolatility"] = np.log(
            self.dataframe["High"] / self.dataframe["Low"]
        )

        ## Calculating GARMAN KLASS Volatility using the mathematical formula
        self.dataframe["GarmanKlassVolatility"] = np.sqrt(
            (((np.log(self.dataframe["High"] / self.dataframe["Low"])) ** 2) * 0.5)
            - (
                ((2 * (np.log(2))) - 1)
                * ((np.log(self.dataframe["Close"] / self.dataframe["Open"])) ** 2)
            )
        )

        return self.dataframe

    ## Technical Indicators and features for Volatility Prediction
    def create_feature_set(self, Moving_AvgStd_window=22, lookback_period=[5, 10, 22]):

        # Ensuring the returns are present
        if "Returns" not in self.dataframe.columns:
            self.dataframe["Returns"] = np.log(
                self.dataframe["Close"] / self.dataframe["Close"].shift(1)
            )

        ## Calculating features
        if self.features is None:
            self.features = pd.DataFrame(index=self.dataframe.index)

        ### Volume-based features
        #### this is the rolling mean
        self.features["Volume_MovAvg"] = (
            self.dataframe["Volume"].rolling(window=Moving_AvgStd_window).mean()
        )
        #### this is the rolling standard deviation
        self.features["Volume_StdDev"] = (
            self.dataframe["Volume"].rolling(window=Moving_AvgStd_window).std()
        )

        ### Price-based features
        for period in lookback_period:

            #### Moving avgs
            self.features[f"Price_MovAvg_{period}"] = (
                self.dataframe["Close"].rolling(window=period).mean()
            )
            self.features[f"Price_StdDev_{period}"] = (
                self.dataframe["Close"].rolling(window=period).std()
            )

            #### RSI -> Relative Strength Index ==> OverBought or OverSold
            delta = self.dataframe["Close"].diff()
            avg_gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
            ##### if the where condition is not satisfied then we replace it by 0
            avg_loss = (delta.where(delta < 0, 0)).rolling(window=period).mean()
            relative_strength = avg_gain / avg_loss
            self.features[f"RSI_{period}"] = 100 - (100 / (1 + relative_strength))

            #### Historical Volatility
            self.features[f"Hist_Vol_{period}"] = (
                self.dataframe["Returns"].rolling(window=period).std()
            ) * self.scaling_factor

        ### VWAP-> Volume Weighted Average Price
        self.features["VWAP"] = (
            self.dataframe["Close"] * self.dataframe["Volume"]
        ).cumsum()

        return self.features

    ## Preparing complete dataset for the training volatility predictor model
    def prepare_training_data(
        self,
        start_date,
        end_date,
        Moving_AvgStd_window=22,
        lookback_period=[5, 10, 22],
        prediction_horizon=5,
    ):
        self.fetch_financial_data(start_date, end_date)
        self.calculate_realized_volatility()
        self.create_feature_set(Moving_AvgStd_window, lookback_period)

        # Target variable -> Future Volatility
        target = self.dataframe["RealizedVolatility"].shift(-(prediction_horizon))

        # Combining features and target
        final_dataset = pd.concat(
            [features, target], axis=1
        ).dropna()  #### dropping the columns with missing values from the data set

        # Getting the feature names before scaling
        feature_cols = final_dataset.columns[:1]  ### all columns except the target

        ### starting the standard scaler
        scaler = ssc()

        # Fit and transform the features (target not included)
        scaled_features = scaler.fit_transform(final_dataset.iloc[:, :-1])

        ## Convert scaled features back to dataframe with correct column names
        self.scaled_features_df = pd.DataFrame(
            scaled_features, index=final_dataset.index, columns=feature_cols
        )

        return final_dataset["RealizedVolatility"], scaler

# Model to Learn Volatility


In [4]:
class VolatilityPredictor:

    # Constructor
    def __init__(self, model_type="xgboost"):
        self.model_type = model_type
        self.model = None

    # Create XGBoost model with optimized parameters for volatility prediction
    def create_xgboost_model(
        self,
        n_estimators=100,
        learning_rate=0.1,
        max_depth=5,
        min_child_weight=1,
        gamma=0,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="reg:squarederror",
        random_state=42,
    ):
        return xgb.XGBRegressor(
            n_estimators=n_estimators,
            learning_rate=learning_rate,
            max_depth=max_depth,
            min_child_weight=min_child_weight,
            gamma=gamma,
            subsample=subsample,
            colsample_bytree=colsample_bytree,
            objective=objective,
            random_state=random_state,
        )

    # Create LSTM model for sequence-based volatility prediction
    def create_lstm_model(
        self,
        input_shape,
        no_lstm_layers=50,
        dropout=0.2,
        dense=1,
        optimizer="adam",
        loss="mse",
    ):
        model = seqMD(
            [
                LSTM(no_lstm_layers, return_sequences=True, input_shape=input_shape),
                dpMD(dropout),
                LSTM(no_lstm_layers),
                dpMD(dropout),
                Dense(dense),
            ]
        )
        model.compile(optimizer=optimizer, loss=loss)
        return model

    # Prepare for LSTM model
    def prepare_sequences(self, X, sequence_length=10):
        sequences = []
        lenX = len(X)
        for i in range(lenX - sequence_length):
            sequences.append(X[i : (i + sequence_length)])
        return np.array(sequences)

    # Training on Volatility prediction model
    def train(
        self, X, y, validation_split=0.2, target_seq_len=10, epochs=50, batch_size=32
    ):
        if self.model_type == "xgboost":
            self.model = self.create_xgboost_model()
            self.model.fit(X, y)
            return
        elif self.model_type == "LSTM":
            X_seq = self.prepare_sequences(X, target_seq_len)
            y_seq = y[target_seq_len:]
            self.model.fit(
                X_seq,
                y_seq,
                epochs=epochs,
                batch_size=batch_size,
                validation_split=validation_split,
            )
            return
        return
    
    # Evaluation of model based on performance using multiple metrics
    def evaluate(self,X_test,y_test):
        if (self.model_type=="xgboost"):
            prediction = self.model.predict(X_test)
        elif (self.model_type=="LSTM"):
            X_test_seq=self.prepare_sequences(X_test)
            prediction = self.model.predict(X_test_seq)
        
        metrics = {
            "RMSE":np.sqrt(mse(y_test,prediction)), ## root mean square error
            "MAE":mae(y_test,prediction), ## mean absolute error
            "R2":r2s(y_test,prediction) ## r2 score
        }

        return metrics
    
    # Making the volatility predictions on new data
    def predict(self,X):
        if (self.model_type=="xgboost"):
            return self.model.predict(X)
        elif (self.model_type=="LSTM"):
            return self.model.predict(self.prepare_sequences(X))

# Starting the system