# Imports


In [1]:
from IPython.core.interactiveshell import InteractiveShell
import inspect
import json
from datetime import datetime as dt
from datetime import timedelta
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from warnings import filterwarnings
from zoneinfo import ZoneInfo

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.graph_objs as go
import plotly.io as pio
import plotly.offline as pyo
import scipy as sc
import seaborn as sns
import statsmodels.api as sm
import talib
import yfinance as yf
from pandas import DataFrame, Series
from pandas.core.frame import DataFrame
from plotly.offline import init_notebook_mode
from plotly.subplots import make_subplots
from scipy import stats
from scipy.stats import chi2_contingency, kendalltau, spearmanr
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_selection import VarianceThreshold, chi2, mutual_info_regression
from sklearn.inspection import permutation_importance
from sklearn.linear_model import ElasticNet, LinearRegression, Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import (
    GridSearchCV,
    TimeSeriesSplit,
    cross_val_score,
    train_test_split,
)
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.stats.stattools import durbin_watson
from yfinance.ticker import Ticker
from sklearn.model_selection import learning_curve
from analytics import Stock, TechnicalIndicators

filterwarnings("ignore")
sns.set(style="darkgrid")
plt.style.use("dark_background")
plt.rcParams.update({"grid.linewidth": 0.5, "grid.alpha": 0.5})
plt.rc("figure", figsize=(16, 10))
plt.rc("lines", markersize=4)
plt.rcParams["figure.autolayout"] = True
sns.set_context("poster")
init_notebook_mode(connected=True)
pio.templates.default = "plotly_dark"
InteractiveShell.ast_node_interactivity = "all"


A NumPy version >=1.16.5 and <1.23.0 is required for this version of SciPy (detected version 1.24.2



# Data Preparation


## Generating all input variables $X_i$


We create the class InputVariables in order to generate our input variables.


In [2]:
class InputVariables:
    def __init__(self, data: DataFrame, col="Close") -> None:
        self.data: DataFrame = data.copy()
        self.col: str = col

    def calculate_log_returns(self) -> None:
        self.data["log_returns"] = np.log(self.data[self.col]) - np.log(
            self.data[self.col].shift(1)
        )

    def add_Indicators(self) -> None:
        inds = pd.DataFrame()
        succeed = []
        failed = []
        for func in talib.get_functions():
            try:
                ind = getattr(talib, func)(self.data.loc[:, self.col]).rename(func)
                inds = pd.concat([inds, ind], axis=1)
                succeed += [func]
            except:
                failed += [func]
        self.data = pd.concat([self.data, inds], axis=1)

    def generate_all(self) -> DataFrame:
        self.calculate_log_returns()
        self.add_Indicators()
        return self.data.dropna()

In [3]:
stock = Stock("AAPL")

[*********************100%***********************]  1 of 1 completed


In [4]:
# Usage example:
tv = InputVariables(stock.data)
all_representations: DataFrame = tv.generate_all()
all_representations.head()

Unnamed: 0,Open,High,Low,Close,Adj Close,Volume,log_returns,HT_DCPERIOD,HT_DCPHASE,HT_TRENDMODE,...,TEMA,TRIMA,WMA,LINEARREG,LINEARREG_ANGLE,LINEARREG_INTERCEPT,LINEARREG_SLOPE,STDDEV,TSF,VAR
1981-04-21 00:00:00,0.122768,0.123326,0.122768,0.122768,0.095386,28537600,0.065756,14.604519,243.334265,0,...,0.119979,0.114588,0.116856,0.12103,0.021924,0.116056,0.000383,0.004767,0.121413,2.3e-05
1981-04-22 00:00:00,0.127232,0.12779,0.127232,0.127232,0.098854,18995200,0.035716,14.676539,248.69034,1,...,0.12168,0.11519,0.117729,0.121795,0.016723,0.118001,0.000292,0.00554,0.122087,3.1e-05
1981-04-23 00:00:00,0.13058,0.131138,0.13058,0.13058,0.101456,58016000,0.025974,14.783967,252.654674,1,...,0.12376,0.115848,0.11876,0.124235,0.030145,0.117395,0.000526,0.007178,0.124761,5.2e-05
1981-04-24 00:00:00,0.13058,0.13058,0.129464,0.129464,0.100589,35056000,-0.008583,14.969893,223.270967,1,...,0.12536,0.116506,0.119645,0.1261,0.039562,0.117124,0.00069,0.005691,0.12679,3.2e-05
1981-04-27 00:00:00,0.128906,0.128906,0.128348,0.128348,0.099722,38528000,-0.008658,15.315992,138.984625,1,...,0.126534,0.117134,0.120396,0.127041,0.040123,0.117937,0.0007,0.002697,0.127741,7e-06


## Preparing data


We create the class data_prep in order to train, split and plot the data.


In [5]:
class DataPreparer(InputVariables):
    def __init__(
        self,
        df: pd.DataFrame,
        target_col: str,
        testsize: float = 0.3,
        random_state: int = 101,
        shuffle=False,
    ) -> None:
        super().__init__(df, target_col)
        self.target_col: str = target_col
        self.testsize: float = testsize
        self.random_state: int = random_state
        self.shuffle: bool = shuffle
        self.startup()

    def startup(self) -> None:
        self.data = self.generate_all()
        self.X: DataFrame = self.data.drop(columns=[self.target_col])
        self.y: DataFrame = self.data[[self.target_col]]
        self.train_test_split()

    def train_test_split(self) -> None:
        X_train, X_test, y_train, y_test = train_test_split(
            self.X,
            self.y,
            test_size=self.testsize,
            shuffle=self.shuffle,
            random_state=self.random_state,
        )
        self.X_train = X_train.sort_index()
        self.X_test = X_test.sort_index()
        self.y_train = y_train.sort_index()
        self.y_test = y_test.sort_index()

    def plot(self) -> go.Figure:
        fig = go.Figure()

        fig.add_trace(
            go.Scatter(
                x=self.X_train.index,
                y=self.y_train[self.target_col],
                mode="lines",
                name=f"Train Data {1 - self.testsize}%",
            )
        )
        fig.add_trace(
            go.Scatter(
                x=self.X_test.index,
                y=self.y_test[self.target_col],
                mode="lines",
                name=f"Test Data {self.testsize}%",
            )
        )

        fig.update_layout(
            title=f"Train and Test Data",
            xaxis_title="Index",
            yaxis_title=self.target_col,
        )
        return fig

In [6]:
# Choose the target variable representation (e.g., 'returns', 'log_returns', 'diff', or 'close')
target_column = "Close"

# Initialize the data_prep class and prepare the data
dp = DataPreparer(stock.data, target_column, testsize=0.15)

# Use the prepared data for training and evaluating models or creating plots
dp.plot()

In [7]:
dp.X_train.shape[0]
dp.X_test.shape[0]

4987

881

In [8]:
dp.X_test.head()

Unnamed: 0,Open,High,Low,Adj Close,Volume,log_returns,HT_DCPERIOD,HT_DCPHASE,HT_TRENDMODE,MAX,...,TEMA,TRIMA,WMA,LINEARREG,LINEARREG_ANGLE,LINEARREG_INTERCEPT,LINEARREG_SLOPE,STDDEV,TSF,VAR
2001-05-18 00:00:00,0.417143,0.422143,0.412857,0.357622,159051200,-0.000849,27.369562,262.107284,0,0.474821,...,0.434452,0.436342,0.431218,0.40723,-0.23579,0.460729,-0.004115,0.005677,0.403115,3.2e-05
2001-05-21 00:00:00,0.421964,0.426964,0.411607,0.358077,460997600,0.001272,26.735115,275.009398,1,0.474821,...,0.432536,0.437617,0.431003,0.40797,-0.202623,0.453944,-0.003536,0.005262,0.404433,2.8e-05
2001-05-22 00:00:00,0.428571,0.430893,0.417857,0.357166,412916000,-0.002549,25.836486,298.853267,1,0.474821,...,0.430613,0.438346,0.430604,0.411327,-0.13829,0.442704,-0.002414,0.004052,0.408913,1.6e-05
2001-05-23 00:00:00,0.424107,0.424107,0.408214,0.353062,281041600,-0.011557,24.876831,-30.146475,1,0.474821,...,0.428012,0.438424,0.429781,0.410862,-0.122933,0.438755,-0.002146,0.002209,0.408717,5e-06
2001-05-24 00:00:00,0.415893,0.416071,0.403929,0.352606,271756800,-0.001291,24.139857,-8.85854,1,0.474821,...,0.425603,0.438033,0.428878,0.413158,-0.074025,0.429954,-0.001292,0.002782,0.411866,8e-06


In [9]:
dp.X_train.head()

Unnamed: 0,Open,High,Low,Adj Close,Volume,log_returns,HT_DCPERIOD,HT_DCPHASE,HT_TRENDMODE,MAX,...,TEMA,TRIMA,WMA,LINEARREG,LINEARREG_ANGLE,LINEARREG_INTERCEPT,LINEARREG_SLOPE,STDDEV,TSF,VAR
1981-04-21 00:00:00,0.122768,0.123326,0.122768,0.095386,28537600,0.065756,14.604519,243.334265,0,0.124442,...,0.119979,0.114588,0.116856,0.12103,0.021924,0.116056,0.000383,0.004767,0.121413,2.3e-05
1981-04-22 00:00:00,0.127232,0.12779,0.127232,0.098854,18995200,0.035716,14.676539,248.69034,1,0.127232,...,0.12168,0.11519,0.117729,0.121795,0.016723,0.118001,0.000292,0.00554,0.122087,3.1e-05
1981-04-23 00:00:00,0.13058,0.131138,0.13058,0.101456,58016000,0.025974,14.783967,252.654674,1,0.13058,...,0.12376,0.115848,0.11876,0.124235,0.030145,0.117395,0.000526,0.007178,0.124761,5.2e-05
1981-04-24 00:00:00,0.13058,0.13058,0.129464,0.100589,35056000,-0.008583,14.969893,223.270967,1,0.13058,...,0.12536,0.116506,0.119645,0.1261,0.039562,0.117124,0.00069,0.005691,0.12679,3.2e-05
1981-04-27 00:00:00,0.128906,0.128906,0.128348,0.099722,38528000,-0.008658,15.315992,138.984625,1,0.13058,...,0.126534,0.117134,0.120396,0.127041,0.040123,0.117937,0.0007,0.002697,0.127741,7e-06


In [10]:
dp.y_test.head()

Unnamed: 0,Close
2001-05-18 00:00:00,0.420179
2001-05-21 00:00:00,0.420714
2001-05-22 00:00:00,0.419643
2001-05-23 00:00:00,0.414821
2001-05-24 00:00:00,0.414286


In [11]:
dp.y_train.head()

Unnamed: 0,Close
1981-04-21 00:00:00,0.122768
1981-04-22 00:00:00,0.127232
1981-04-23 00:00:00,0.13058
1981-04-24 00:00:00,0.129464
1981-04-27 00:00:00,0.128348


# Feature Analysis


### Overview

We want to reduce the features such that:
1.  They are independent of the target variable.
2.  They are independent of each other.
3.  Our selection does not does not depend on any a priori properties between target and dependent variables.
4.  Our selection rely on the characteristics of the data itself.

To that end we will use **Filter Methods:** These techniques rely on the characteristics of the data itself, such as correlation, mutual information, or statistical tests, to rank features. Specifically we will use:
1. **Pearson Correlation Coefficient:** The threshold typically ranges between 0 and 1. A higher threshold value means that you'll keep only features that have a strong correlation with the target variable, while a lower value allows for weaker correlations. You might need to adjust the threshold based on the problem's requirements and the correlations between the features and the target variable.

2. **Mutual Information**: The threshold can range between 0 and a positive value, with higher values indicating stronger dependence between the feature and the target variable. The choice of threshold depends on the scale of mutual information values in your dataset and the desired level of feature reduction.

Overall, these feature selection methods are important in identifying and selecting the most relevant features for a given problem. By reducing the number of features used in an analysis, these methods can help to reduce overfitting and improve the accuracy and efficiency of machine learning models.

In [229]:
class FeatureAnalyzer:
    """
    A class for feature analysis and selection.

    Parameters:
    -----------
    X : pandas.DataFrame
        The features matrix to analyze.

    y : pandas.DataFrame
        The target vector.

    model : sklearn.base.BaseEstimator
        A supervised learning model used to extract feature importances. Default is None.

    Methods:
    --------
    corr() -> pandas.DataFrame:
        Calculates the Pearson correlation coefficients between the features and the target.

    mutual_info() -> pandas.DataFrame:
        Calculates the mutual information between the features and the target.

    corr_matrix() -> pandas.DataFrame:
        Calculates the Pearson correlation matrix between the features.

    filter_threshold(df, column, threshold, greater=False) -> List[str]:
        Filters a DataFrame based on a given column and a threshold.

    filter_corr(threshold) -> List[str]:
        Filters the features based on the Pearson correlation coefficient.

    filter_mutual_info(threshold) -> List[str]:
        Filters the features based on the mutual information.

    filter_corr_matrix(threshold=0.3) -> List[str]:
        Filters the features based on the Pearson correlation matrix.

    identify_zero_variance() -> List[str]:
        Identifies the features with zero variance.

    identify_collinear(threshold) -> List[str]:
        Identifies the collinear features based on the VIF score.

    get_feature_importance() -> pandas.DataFrame:
        Extracts the feature importance from the model.

    identify_low_importance(threshold) -> List[str]:
        Identifies the features with low importance based on a threshold.

    combine_criteria(collinear_threshold=5, importance_threshold=0.01,
                     correlation_threshold=0.3, mutual_info_threshold=0.2) -> List[str]:
        Combines multiple feature selection criteria to select the best features.

    transform_X() -> pandas.DataFrame:
        Returns a new DataFrame with the selected features.

    summary(missing_threshold=0.1, collinear_threshold=5, importance_threshold=0.01,
            correlation_threshold=0.3, mutual_info_threshold=0.2) -> Dict[str, Dict[str, Union[List[str], int]]]:
        Returns a summary of the feature analysis and selection process.
    """  
    def __init__(self, X: DataFrame, y: DataFrame, model: BaseEstimator = None) -> None:
        self.X: DataFrame = X
        self.y: DataFrame = y
        self.model: BaseEstimator = model

    def corr(self) -> DataFrame:
        return pd.DataFrame(
            np.corrcoef(np.column_stack((self.X, self.y)).T)[:-1, -1],
            columns=["pearson_corr"],
            index=self.X.columns,
        )

    def mutual_info(self) -> DataFrame:
        return pd.DataFrame(
            mutual_info_regression(self.X, self.y),
            columns=["mutual_info"],
            index=self.X.columns,
        )

    def corr_matrix(self) -> pd.DataFrame:
        return self.X.corr(method="pearson")

    def filter_threshold(
        self, df: DataFrame, column: str, threshold: float, greater: bool = False
    ) -> List[str]:
        mask = df[column] > threshold if greater else df[column] < threshold
        return df[mask].index.to_list()

    def filter_corr(self, threshold) -> List[str]:
        return self.filter_threshold(
            self.corr(), "pearson_corr", threshold, greater=False
        )

    def filter_mutual_info(self, threshold) -> List[str]:
        return self.filter_threshold(
            self.mutual_info(),
            "mutual_info",
            self.mutual_info().mutual_info.quantile(threshold),
            greater=False,
        )

    def filter_corr_matrix(self, threshold=0.3) -> list:
        corr_matrix: DataFrame = self.corr_matrix()
        np.fill_diagonal(corr_matrix.values, 0)
        stacked_corr_matrix = corr_matrix.abs().stack().reset_index()
        stacked_corr_matrix.columns = ["feature_1", "feature_2", "correlation"]
        filtered_corr_matrix = self.filter_threshold(
            stacked_corr_matrix, "correlation", threshold, greater=False
        )

        return list(
            set(filtered_corr_matrix["feature_1"].unique())
            | set(filtered_corr_matrix["feature_2"].unique())
        )

    def identify_missing(self, threshold: float) -> List[str]:
        return self.filter_threshold(
            self.X.isnull().mean().to_frame("missing_fraction"),
            "missing_fraction",
            threshold,
            greater=True,
        )

    def identify_zero_variance(self) -> List[str]:
        selector = VarianceThreshold(threshold=0)
        selector.fit(self.X)
        return list(self.X.columns[~selector.get_support()])

    def identify_collinear(self, threshold: float) -> List[str]:
        # Scale the features
        X_scaled = StandardScaler().fit_transform(self.X)
        vif_data = pd.DataFrame()
        vif_data["feature"] = self.X.columns
        vif_data["VIF"] = [
            variance_inflation_factor(X_scaled, i) for i in range(len(self.X.columns))
        ]
        return list(vif_data.loc[vif_data["VIF"] < threshold, "feature"])

    def get_feature_importance(self) -> pd.DataFrame:
        if self.model is None:
            raise ValueError("A model must be provided for this method.")

        if hasattr(self.model, "feature_importances_"):
            importances = self.model.feature_importances_
        elif hasattr(self.model, "coef_"):
            importances = np.abs(self.model.coef_)
        else:
            raise ValueError(
                "The provided model does not have the feature_importances_' or 'coef_' attribute."
            )
        feature_importance_df = pd.DataFrame(
            importances, columns=["importance"], index=self.X.columns
        )
        return feature_importance_df.sort_values(by="importance", ascending=False)

    def identify_low_importance(self, threshold: float) -> List[str]:
        if self.model is None:
            raise ValueError("A model must be provided for this method.")

        return self.filter_threshold(
            self.get_feature_importance(), "importance", threshold, greater=False
        )

    def combine_criteria(
        self,
        collinear_threshold=5,
        importance_threshold=0.01,
        correlation_threshold=0.3,
        mutual_info_threshold=0.2,
    ) -> List[str]:
        criteria: List[List[str]] = [
            self.identify_collinear(collinear_threshold),
            self.filter_corr(correlation_threshold),
            self.filter_mutual_info(mutual_info_threshold),
        ]
        if self.model is not None:
            criteria.append(self.identify_low_importance(importance_threshold))
        selected_features = set.intersection(*map(set, criteria)) - set(
            self.identify_zero_variance()
        )
        return list(selected_features)

    def transform_X(self) -> DataFrame:
        selected_features = set(self.combine_criteria())
        return self.X.loc[:, list(selected_features)]

    def summary(
        self,
        missing_threshold=0.1,
        collinear_threshold=5,
        importance_threshold=0.01,
        correlation_threshold=0.3,
        mutual_info_threshold=0.2,
    ) -> Dict[str, Dict[str, Union[List[str], int]]]:
        summary_dict = {
            "missing_values": {
                "features": self.identify_missing(missing_threshold),
                "count": len(self.identify_missing(missing_threshold)),
            },
            "zero_variance": {
                "features": self.identify_zero_variance(),
                "count": len(self.identify_zero_variance()),
            },
            "collinear": {
                "features": self.identify_collinear(collinear_threshold),
                "count": len(self.identify_collinear(collinear_threshold)),
            },
            "low_correlation": {
                "features": self.filter_corr(correlation_threshold),
                "count": len(self.filter_corr(correlation_threshold)),
            },
            "low_mutual_info": {
                "features": self.filter_mutual_info(mutual_info_threshold),
                "count": len(self.filter_mutual_info(mutual_info_threshold)),
            },
            "combined_criteria": {
                "features": self.combine_criteria(
                    collinear_threshold,
                    importance_threshold,
                    correlation_threshold,
                    mutual_info_threshold,
                ),
                "count": len(
                    self.combine_criteria(
                        collinear_threshold,
                        importance_threshold,
                        correlation_threshold,
                        mutual_info_threshold,
                    )
                ),
            },
        }

        if self.model is not None:
            summary_dict["low_importance"] = {
                "features": self.identify_low_importance(importance_threshold),
                "count": len(self.identify_low_importance(importance_threshold)),
            }

        return summary_dict

In [230]:
feature_analyzer = FeatureAnalyzer(dp.X, dp.y)

In [231]:
feature_analyzer.combine_criteria()

['HT_TRENDMODE', 'Volume', 'HT_DCPERIOD', 'HT_DCPHASE']

In [232]:
feature_analyzer.summary()

{'missing_values': {'features': [], 'count': 0},
 'zero_variance': {'features': ['CEIL', 'FLOOR'], 'count': 2},
 'collinear': {'features': ['Volume',
   'log_returns',
   'HT_DCPERIOD',
   'HT_DCPHASE',
   'HT_TRENDMODE',
   'VAR'],
  'count': 6},
 'low_correlation': {'features': ['Volume',
   'log_returns',
   'HT_DCPERIOD',
   'HT_DCPHASE',
   'HT_TRENDMODE',
   'ACOS',
   'COS',
   'APO',
   'CMO',
   'MOM',
   'PPO',
   'ROC',
   'ROCP',
   'ROCR',
   'ROCR100',
   'RSI',
   'TRIX',
   'LINEARREG_ANGLE',
   'LINEARREG_SLOPE',
   'VAR'],
  'count': 20},
 'low_mutual_info': {'features': ['Volume',
   'HT_DCPERIOD',
   'HT_DCPHASE',
   'HT_TRENDMODE',
   'CEIL',
   'FLOOR',
   'PPO',
   'ROC',
   'ROCP',
   'ROCR',
   'ROCR100'],
  'count': 11},
 'combined_criteria': {'features': ['HT_TRENDMODE',
   'Volume',
   'HT_DCPERIOD',
   'HT_DCPHASE'],
  'count': 4}}

In [233]:
feature_analyzer.transform_X()

Unnamed: 0,HT_TRENDMODE,Volume,HT_DCPERIOD,HT_DCPHASE
1981-04-21,0,28537600,14.604519,243.334265
1981-04-22,1,18995200,14.676539,248.690340
1981-04-23,1,58016000,14.783967,252.654674
1981-04-24,1,35056000,14.969893,223.270967
1981-04-27,1,38528000,15.315992,138.984625
...,...,...,...,...
2004-11-15,1,376045600,23.825818,201.539660
2004-11-16,1,295103200,24.564456,203.144468
2004-11-17,1,397751200,25.518182,204.768244
2004-11-18,1,459149600,26.489463,204.228373


# PREDICTORS


When working with time series prediction, it is important to choose models that can handle time-dependent patterns in the data. Here's a list of compatible models that can be used for time series prediction, along with some additional models specifically designed for time series forecasting:

1. Tree-based Models:

    - Random Forest Regressor (sklearn.ensemble.RandomForestRegressor)
    - Extra Trees Regressor (sklearn.ensemble.ExtraTreesRegressor)
    - Gradient Boosting Regressor (sklearn.ensemble.GradientBoostingRegressor)

1. Linear Models:

    - Lasso (sklearn.linear_model.Lasso)
    - Ridge (sklearn.linear_model.Ridge)
    - ElasticNet (sklearn.linear_model.ElasticNet)
    - Lasso LARS (sklearn.linear_model.LassoLars)
    - Lasso LARS IC (sklearn.linear_model.LassoLarsIC)
    - Lars (sklearn.linear_model.Lars)
    - Lars CV (sklearn.linear_model.LarsCV)
    - Lasso CV (sklearn.linear_model.LassoCV)
    - Ridge CV (sklearn.linear_model.RidgeCV)
    - ElasticNet CV (sklearn.linear_model.ElasticNetCV)
    - Orthogonal Matching Pursuit (sklearn.linear_model.OrthogonalMatchingPursuit)
    - Orthogonal Matching Pursuit CV (sklearn.linear_model.OrthogonalMatchingPursuitCV)

1. Ensemble Models:

    - AdaBoost Regressor (sklearn.ensemble.AdaBoostRegressor)
    - Bagging Regressor (sklearn.ensemble.BaggingRegressor)

1. Time Series Models:

    - Autoregression (statsmodels.tsa.ar_model.AutoReg)
    - SARIMAX (statsmodels.tsa.statespace.sarimax.SARIMAX)
    - Exponential Smoothing State Space Model (statsmodels.tsa.statespace.exponential_smoothing.ExponentialSmoothing)

We will pick a few of these and optimize them for the close for our case.


## ModelEvaluator

We introduce a utility class that trains a model using cross validation and calculates all the quantites related to the training of the model. It finally plots the predictions and the learning curve.

In [None]:
class ModelEvaluator:
    """
    A class for evaluating machine learning models using grid search cross-validation and learning curves.

    Parameters
    ----------
    model : BaseEstimator
        A scikit-learn estimator object to be evaluated.
    data_preparer : DataPreparer
        A DataPreparer object containing the training and testing data.

    Attributes
    ----------
    model : BaseEstimator
        The best-performing estimator found by grid search.
    data_preparer : DataPreparer
        The DataPreparer object used to prepare the training and testing data.
    cv_scores : array-like
        Cross-validated scores of the model.
    predictions : DataFrame
        Predictions of the model on the testing data.
    best_score : float
        Best cross-validated score achieved by the model.
    best_params : dict
        Parameters of the best-performing estimator found by grid search.

    Methods
    -------
    gridcv_tune(param_grid=None, scoring='r2', cv=5)
        Tune the hyperparameters of the model using grid search cross-validation.
    plot_learning_curve()
        Plot the learning curve of the model.
    plot()
        Plot the predictions of the model on the training and testing data.
    """

    def __init__(self, model, data_preparer) -> None:
        self.model = model
        self.data_preparer = data_preparer
        self.cv_scores = None
        self.predictions = None

    def gridcv_tune(
        self,
        param_grid: Optional[dict[str, Any]] = None,
        scoring: str = "r2",
        cv: int = 5,
    ):
        if param_grid is None:
            param_grid = {
                # default parameter grid, can be updated based on the model
            }
        self.cv: int = cv

        # Perform grid search cross-validation
        self.grid_search = GridSearchCV(
            self.model, param_grid, cv=self.cv, scoring=scoring
        )
        self.grid_search.fit(self.data_preparer.X_train, self.data_preparer.y_train)
        self.model: BaseEstimator = self.grid_search.best_estimator_

        # Add logic for different model types to extract coefficients/feature importances
        self.best_params = self.grid_search.best_params_

        # Get the best score
        self.best_score: float = self.grid_search.best_score_
        print(f"Best cross-validated score: {self.best_score:.2f}")

        # Make predictions on the test set
        self.predictions = pd.DataFrame(
            self.model.predict(self.data_preparer.X_test),
            index=self.data_preparer.y_test.index,
            columns=[self.data_preparer.target_col],
        )

        # Calculate cross-validated R-squared scores
        self.cv_scores = cross_val_score(
            self.model,
            self.data_preparer.X,
            self.data_preparer.y,
            cv=self.cv,
            scoring=scoring,
        )
        return self.cv_scores

    def plot_learning_curve(self) -> go.Figure:
        if not hasattr(self, "model"):
            raise ValueError("There is no model available.")

        # Define the training set sizes to use
        train_sizes = np.linspace(0.1, 1.0, 10)

        # Compute the learning curve
        train_sizes_abs, train_scores, val_scores, fit_times, _ = learning_curve(
            self.model,
            dp.X,
            dp.y,
            train_sizes=train_sizes,
            cv=5,
            scoring="neg_root_mean_squared_error",
            n_jobs=-1,
            verbose=1,
            return_times=True,
        )

        # Compute the mean and standard deviation of the scores
        train_scores_mean = -np.mean(train_scores, axis=1)
        train_scores_std = -np.std(train_scores, axis=1)
        val_scores_mean = -np.mean(val_scores, axis=1)
        val_scores_std = -np.std(val_scores, axis=1)

        # Compute the mean fit time
        fit_times_mean = np.mean(fit_times, axis=1)

        # Calculate the optimal training set size
        optimal_idx = np.argmin(val_scores_mean)
        optimal_size = train_sizes_abs[optimal_idx]
        self.optimal_size = round(optimal_size/dp.X.shape[0], 2)
        # Create the learning curve figure
        fig = go.Figure()

        # Plot the training scores
        fig.add_trace(
            go.Scatter(
                x=train_sizes_abs,
                y=train_scores_mean,
                mode="lines+markers",
                name="Training score",
                line=dict(color="blue"),
                error_y=dict(
                    type="data", array=train_scores_std, visible=True, color="blue"
                ),
            )
        )

        # Plot the validation scores
        fig.add_trace(
            go.Scatter(
                x=train_sizes_abs,
                y=val_scores_mean,
                mode="lines+markers",
                name="Validation score",
                line=dict(color="green"),
                error_y=dict(
                    type="data", array=val_scores_std, visible=True, color="green"
                ),
            )
        )

        # Add a vertical line for the optimal training set size
        fig.add_shape(
            type="line",
            x0=optimal_size,
            x1=optimal_size,
            y0=0,
            y1=1,
            yref="paper",
            xref="x",
            line=dict(color="red", dash="dash"),
        )

        # Add a secondary y-axis for the training time
        fig.update_layout(
            yaxis2=dict(
                title="Training Time (s)", overlaying="y", side="right", showgrid=False
            ),
            title="Learning Curve",
            xaxis_title="Training Set Size",
            yaxis_title="Error (RMSE)",
        )

        # Plot the training time
        fig.add_trace(
            go.Scatter(
                x=train_sizes_abs,
                y=fit_times_mean,
                mode="lines+markers",
                name="Training Time",
                line=dict(color="orange"),
                yaxis="y2",
            )
        )

        # Add a text box with the model performance metric
        fig.add_annotation(
            x=0.05,
            y=0.95,
            xref="paper",
            yref="paper",
            text="Performance Metric: RMSE",
            showarrow=False,
            font=dict(size=12),
        )

        # Add a text box with the optimal training set size
        fig.add_annotation(
            x=optimal_size,
            y=0.05,
            xref="x",
            yref="paper",
            text=f"Optimal Size: {optimal_size}({self.optimal_size}%)",
            showarrow=True,
            arrowhead=1,
            arrowcolor="red",
            font=dict(size=12),
        )

        return fig

    def extract_features(self, threshold=0):
        if not hasattr(self, 'grid_search'):
            raise ValueError("No grid search object found.")
        filtered_coef = None
        if hasattr(self.model, "coef_"):
            # Extract feature coefficients above the threshold
            self.coef_ = self.model.coef_
            coef = pd.DataFrame(
                self.model.coef_.T,
                index=self.data_preparer.X_train.columns,
                columns=["Coefficients"],
            )
            filtered_coef = coef[abs(coef["Coefficients"]) >= threshold]
        elif hasattr(self.model, "feature_importances_"):
            # Extract feature importances above the threshold
            self.feature_importances = self.model.feature_importances_
            importances = pd.DataFrame(
                self.model.feature_importances_,
                index=self.data_preparer.X_train.columns,
                columns=["Importances"],
            )
            filtered_coef = importances[importances["Importances"] >= threshold]
        else:
            raise ValueError(
                "The specified model does not have feature coefficients or importances."
            )

        return filtered_coef

    def plot(self) -> go.Figure:
        fig = go.Figure()

        fig.add_trace(
            go.Scatter(
                x=self.predictions.index,
                y=self.predictions[self.data_preparer.target_col],
                mode="lines",
                name=f"Predictions",
            )
        )

        fig.add_trace(
            go.Scatter(
                x=self.data_preparer.X_train.index,
                y=self.data_preparer.y_train[self.data_preparer.target_col],
                mode="lines",
                name=f"Train Data {1 - self.data_preparer.testsize}%",
            )
        )
        fig.add_trace(
            go.Scatter(
                x=self.data_preparer.X_test.index,
                y=self.data_preparer.y_test[self.data_preparer.target_col],
                mode="lines",
                name=f"Test Data {self.data_preparer.testsize}%",
            )
        )

        fig.update_layout(
            title=f"Train and Test Data",
            xaxis_title="Index",
            yaxis_title=self.data_preparer.target_col,
        )
        return fig

## Linear Models


We will directly use ElasticNet in this section in order to have all the features of LinearRegression and controlable complexity through regularization.

The grid of our choice is the following
```py
    param_grid = {
        "alpha": [0.001, 0.01, 0.1, 1, 10, 100],
        "l1_ratio": [0.1, 0.3, 0.5, 0.7, 0.9],
        "max_iter": [1000, 2000, 5000],
        "tol": [1e-4, 1e-5, 1e-6],
        "selection": ["cyclic", "random"],
    }
```

- 'alpha': This parameter is a regularization term that is a linear combination of L1 and L2 penalties. Higher values of alpha result in stronger regularization, which may help prevent overfitting. A smaller value of alpha allows the model to be more flexible and capture more complex patterns in the data, but at the risk of overfitting. In this param_grid, we're trying out six different values of alpha: [0.001, 0.01, 0.1, 1, 10, 100].

- 'l1_ratio': This parameter is the mixing parameter between L1 (Lasso) and L2 (Ridge) penalties. It varies between 0 and 1. A value of 0 corresponds to the Ridge penalty, and a value of 1 corresponds to the Lasso penalty. We're trying out five different values of l1_ratio: [0.1, 0.3, 0.5, 0.7, 0.9].

- 'max_iter': This parameter is the maximum number of iterations for the optimization algorithm to converge. If the algorithm does not converge in the specified number of iterations, it will stop early. In this param_grid, we're trying out three different values of max_iter: [1000, 2000, 5000].

- 'tol': This parameter is the tolerance for the optimization algorithm. The algorithm will stop when the update is smaller than tol. Smaller values of tol will result in a more accurate solution, but may take more iterations to converge. In this param_grid, we're trying out three different values of tol: [1e-4, 1e-5, 1e-6].

- 'selection': This parameter determines the method used to update the coefficients during the optimization process. The two possible values are 'cyclic' and 'random'. In the 'cyclic' method, the algorithm iterates through each feature sequentially, while in the 'random' method, the algorithm selects a random feature at each iteration. The param_grid includes both options.


In [340]:
class ElasticNetRegressor(ModelEvaluator):
    def __init__(self, data_preparer) -> None:
        super().__init__(ElasticNet(), data_preparer)

    def evaluate(self):
        mae = mean_absolute_error(self.data_preparer.y_test, self.predictions)
        mse = mean_squared_error(self.data_preparer.y_test, self.predictions)

        print(
            f"""Mean Absolute Error (MAE): {mae:.2f}
        Mean Squared Error (MSE): {mse:.2f}
        Cross-validated R-squared scores: {self.cv_scores}
        Average cross-validated R-squared score: {np.mean(self.cv_scores):.2f}"""
        )
        return (mae, mse, self.cv_scores, np.mean(self.cv_scores))

    def get_summary_dict(self, threshold) -> dict:
        filtered_coef = self.extract_features(threshold)
        summary_dict = {
            "model": type(self.model).__name__,
            "target_variable": self.data_preparer.target_col,
            "num_input_features": self.data_preparer.X_train.shape[1],
            "cv_folds": self.cv,
            "best_params": self.best_params,
            "best_cv_score": self.best_score,
            "cv_scores": self.cv_scores,
            "coefficients": self.coef_,
            "filtered_coef": filtered_coef.rename({'Coefficients':''}, axis=1),
            "MAE": mean_absolute_error(self.data_preparer.y_test, self.predictions),
            "MSE": mean_squared_error(self.data_preparer.y_test, self.predictions),
            "Average cross-validated R-squared score": np.mean(self.cv_scores),
        }
        return summary_dict

    def summarize(self, threshold=0) -> str:
        self.threshold = threshold
        summary_dict = self.get_summary_dict(threshold)
        return f"""Model summary:
    Model: {summary_dict['model']}
    Target variable: {summary_dict['target_variable']}
    Number of input features: {summary_dict['num_input_features']}
    Cross-validation fold: {summary_dict['cv_folds']}
Evaluation:    
    Best_params: {summary_dict['best_params']}
    Best cross-validated score: {summary_dict['best_cv_score']:.2f}
    Mean Absolute Error (MAE): {summary_dict['MAE']:.2f}
    Mean Squared Error (MSE): {summary_dict['MSE']:.2f}
    Cross-validated R-squared scores: {summary_dict['cv_scores']}
Filtered_coef>{self.threshold}: {summary_dict['filtered_coef']}"""

In [341]:
# Choose the target variable representation (e.g., 'returns', 'log_returns', 'diff', or 'close')
target_column = "Close"

# Initialize the data_prep class and prepare the data
dp = DataPreparer(stock.data, target_column, testsize=0.1)

feature_analyzer = FeatureAnalyzer(dp.X, dp.y)
feature_analyzer.combine_criteria()

# Initialize the StockPricePredictor class with the prepared data and a Ridge model
regressor = ElasticNetRegressor(dp)

# Define the range of regularization parameters to be tested
param_grid={
    "alpha": [0.001, 0.01, 0.1, 1, 10, 100],
    "l1_ratio": [0.1, 0.3, 0.5, 0.7, 0.9],
    # "max_iter": [1000, 2000, 5000],
    # "tol": [1e-4, 1e-5, 1e-6],
    # "selection": ["cyclic", "random"],
},

# Perform tuning and evaluation with 5 folds
cv_scores = regressor.gridcv_tune(param_grid, cv=5)

# Evaluate the performance of the model
print(regressor.summarize(threshold=0.01))

['HT_TRENDMODE', 'Volume', 'HT_DCPERIOD', 'HT_DCPHASE']

Best cross-validated score: 0.98
Model summary:
    Model: ElasticNet
    Target variable: Close
    Number of input features: 57
    Cross-validation fold: 5
Evaluation:    
    Best_params: {'alpha': 0.001, 'l1_ratio': 0.1}
    Best cross-validated score: 0.98
    Mean Absolute Error (MAE): 0.01
    Mean Squared Error (MSE): 0.00
    Cross-validated R-squared scores: [0.77135113 0.99740234 0.99586045 0.99516032 0.97275914]
Filtered_coef>0.01:                          
Open             0.012515
High             0.024991
Low              0.027024
Adj Close        0.014884
SUM              0.012470
ACOS            -0.028196
ASIN             0.029094
ATAN             0.025894
EXP              0.108012
LN               0.033254
SIN              0.038527
SINH             0.052102
TAN              0.045436
TANH             0.027925
LINEARREG_ANGLE  0.041636


In [342]:
regressor.plot_learning_curve()

[learning_curve] Training set sizes: [ 469  938 1408 1877 2347 2816 3285 3755 4224 4694]


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=-1)]: Done  50 out of  50 | elapsed:    9.6s finished


In [338]:
regressor.plot()

In [339]:
regressor.optimal_size

0.64

Note for the LinearityChecker class:

-   Using the mean of the predictions as the constant term in the Breusch-Pagan test is a common practice to account for the heteroscedasticity that can arise due to the variability in the constant term of the model. By using the mean of the predictions instead of the constant term, we can better capture this variability and obtain more accurate test results.


In [117]:
from scipy.stats import normaltest


class LinearityChecker:
    def __init__(self, predictions: DataFrame, data_preparer: pd.DataFrame) -> None:
        self.data_preparer: DataFrame = data_preparer
        self.predictions = predictions
        self.y_test = data_preparer.y_test
        self.residuals = self.predictions - self.y_test

    def check_residuals(self, alpha=0.05) -> bool:
        exog = np.column_stack([np.ones_like(self.predictions), self.predictions])
        breusch_pagan_pvalue = het_breuschpagan(self.residuals, exog)[1]
        print(f"Breusch-Pagan test p-value: {breusch_pagan_pvalue:.4f}")
        if breusch_pagan_pvalue < alpha:
            print("The residuals exhibit heteroscedasticity.")
            return False
        else:
            print("The residuals exhibit homoscedasticity.")
            return True

    def check_normality(self, alpha=0.05) -> bool:
        _, pvalue = normaltest(self.predictions.values)
        print(f"Normal test p-value: {pvalue.item():.4f}")
        if pvalue.item() < alpha:
            print("The predictions are not normally distributed.")
            return False
        else:
            print("The predictions are normally distributed.")
            return True

    def check_multicollinearity(self, threshold=5) -> bool:
        vif = pd.DataFrame()
        vif["features"] = self.predictions.columns
        vif["VIF"] = [
            variance_inflation_factor(self.predictions.values, i)
            for i in range(self.predictions.shape[1])
        ]
        print("Variance Inflation Factors (VIF):")
        print(vif)
        if (vif["VIF"] > threshold).any():
            print("The data exhibit multicollinearity.")
            return False
        else:
            print("The data do not exhibit multicollinearity.")
            return True

    def check_all_conditions(self, alpha=0.05, vif_threshold=5):
        conditions = {}
        conditions["Heteroscedasticity"] = self.check_residuals(alpha)
        conditions["Normality"] = self.check_normality(alpha)
        conditions["Multicollinearity"] = self.check_multicollinearity(vif_threshold)
        return conditions

    def plot_residuals(self) -> None:
        plt.scatter(self.predictions, self.residuals)
        plt.xlabel("Predicted values")
        plt.ylabel("Residuals")
        plt.axhline(y=0, color="r", linestyle="--")
        plt.title("Residual Plot")
        plt.show()

    def plot_normality(self) -> None:
        sns.histplot(self.residuals, kde=True)
        plt.xlabel("Residuals")
        plt.title("Residual Distribution")
        plt.show()

    def plot_multicollinearity(self) -> None:
        corr_matrix: DataFrame = self.predictions.corr()
        sns.heatmap(corr_matrix, annot=True, cmap="coolwarm", center=0)
        plt.title("Multicollinearity Heatmap")
        plt.show()

    def plot_all_conditions(self) -> None:
        self.plot_residuals()
        self.plot_normality()
        self.plot_multicollinearity()

In [121]:
regressor.predictions.shape
regressor.predictions.values

(3202, 1)

array([[-0.01435911],
       [-0.01338988],
       [-0.00500747],
       ...,
       [-0.55135605],
       [-0.59218791],
       [-0.5936029 ]])

In [118]:
# Instantiate the LinearityChecker class with the predictions
checker = LinearityChecker(regressor.predictions, regressor.data_preparer)

# Check all the linearity conditions and plot all the linearity plots
checker.check_all_conditions()

Breusch-Pagan test p-value: 0.0000
The residuals exhibit heteroscedasticity.
Normal test p-value: 0.0000
The predictions are not normally distributed.


ValueError: zero-size array to reduction operation maximum which has no identity