In [None]:
# Import necessary libraries for data manipulation, analysis, and modeling
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
import seaborn as sns  
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.model_selection import cross_val_score, GridSearchCV, train_test_split
from sklearn.exceptions import NotFittedError
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

## Define the SP500DataHandler Class

In [None]:
# Define a class to handle S&P 500 market data
class SP500DataHandler:
    # Define class-level attributes for column names
    DATE_COLUMN = 'Date'
    OPEN_COLUMN = 'Open'
    HIGH_COLUMN = 'High'
    LOW_COLUMN = 'Low'
    VOLUME_COLUMN = 'Volume'
    ADJ_CLOSE_COLUMN = 'Adj Close'
    VOLUME_10DAY_AVG = 'Volume_10day_avg'
    ADJ_CLOSE_10DAY_AVG = 'Adj_Close_10day_avg'
    PREDICTIONS_COLUMN = 'predictions'
    CLOSE_LAST_COLUMN = 'Close/Last'
    YEAR_COLUMN = 'Year'
    MONTH_COLUMN = 'Month'
    DAY_COLUMN = 'Day'
    DAY_OF_WEEK_COLUMN = 'DayOfWeek'
    DAILY_RETURNS_COLUMN = 'Daily Returns'
    VOLATILITY_COLUMN = 'Volatility'
    RESIDUALS_COLUMN = 'residuals'
    MA50_ADJ_COLUMN = 'MA50_Adj'
    MA200_ADJ_COLUMN = 'MA200_Adj'
    VWAP_COLUMN = 'VWAP'

## Initialize and Load Data

In [None]:
def __init__(self, file_path):
        self.file_path = file_path  # File path for the data source
        self.data = None  # Placeholder for the data
        self.models = {
            'LinearRegression': LinearRegression(),
            'Ridge': Ridge(),
            'Lasso': Lasso(),
            'RandomForest': RandomForestRegressor(),
            'GradientBoosting': GradientBoostingRegressor()}
        self.param_grid = {
            'Ridge': {'alpha': [10, 50, 100, 200]},
            'Lasso': {'alpha': [0.01, 0.1, 1, 10, 100], 'max_iter': [5000]},
            'RandomForest': {'n_estimators': [100, 200, 300], 'max_depth': [10, 20, 30, None]},
            'GradientBoosting': {'n_estimators': [100, 200], 'learning_rate': [0.01, 0.1]}}

In [None]:
def load_data(self, start_date=None, end_date=None):
        try:
            # Use DATE_COLUMN class attribute
            self.data = pd.read_csv(self.file_path, parse_dates=[self.DATE_COLUMN])
            print("Data loaded successfully.")
            
            # Filter data by date range if both start and end dates are specified
            if start_date and end_date:
                self.data = self.data[(self.data[self.DATE_COLUMN] >= pd.to_datetime(start_date)) &
                                      (self.data[self.DATE_COLUMN] <= pd.to_datetime(end_date))]
        except FileNotFoundError:
            print(f"File not found: {self.file_path}")
        except Exception as e:
            print(f"Failed to load data: {e}")

## Data Cleaning and Preprocessing

In [None]:
def clean_data(self):
        if self.data is not None:
            self.data.dropna(inplace=True)
            
            # Extract year, month, and day from DATE_COLUMN
            self.data[self.YEAR_COLUMN] = self.data[self.DATE_COLUMN].dt.year
            self.data[self.MONTH_COLUMN] = self.data[self.DATE_COLUMN].dt.month
            self.data[self.DAY_COLUMN] = self.data[self.DATE_COLUMN].dt.day
            self.data[self.DAY_OF_WEEK_COLUMN] = self.data[self.DATE_COLUMN].dt.dayofweek
            
            print("Data cleaning and feature engineering completed.")
        else:
            print("Data not loaded. Please load the data first.")

In [None]:
def ten_day_avg(self):
        if self.VOLUME_COLUMN in self.data.columns and self.ADJ_CLOSE_COLUMN in self.data.columns:
            self.data[self.VOLUME_10DAY_AVG] = self.data[self.VOLUME_COLUMN].rolling(window=10).mean()
            self.data[self.ADJ_CLOSE_10DAY_AVG] = self.data[self.ADJ_CLOSE_COLUMN].rolling(window=10).mean()
            
            # Replace infinite values with NaN and then fill them with column mean
            self.data.replace([np.inf, -np.inf], np.nan, inplace=True)
            self.data.fillna(self.data.mean(), inplace=True)
            
            print("Feature engineering completed: 10-day averages added.")
        else:
            print("Required columns for feature engineering are missing.")

In [None]:
def calculate_daily_returns_and_volatility(self):
            # Calculate daily returns and volatility for risk assessment
            self.data[self.DAILY_RETURNS_COLUMN] = self.data[self.CLOSE_LAST_COLUMN].pct_change()
            self.data[self.VOLATILITY_COLUMN] = self.data[self.DAILY_RETURNS_COLUMN].rolling(window=30).std() * np.sqrt(30)

In [None]:
def feature_engineering(self):
        # Perform additional feature engineering to enhance the dataset
        if self.data is not None:
            # Calculate moving averages for 'Adj Close' to smooth out price data
            self.data[self.MA50_ADJ_COLUMN] = self.data[self.ADJ_CLOSE_COLUMN].rolling(window=50).mean()
            self.data[self.MA200_ADJ_COLUMN] = self.data[self.ADJ_CLOSE_COLUMN].rolling(window=200).mean()

            # Calculate Volume-Weighted Average Price (VWAP) as an additional feature
            cum_vol_price = (self.data[self.VOLUME_COLUMN] * self.data[self.ADJ_CLOSE_COLUMN]).cumsum()
            cum_volume = self.data[self.VOLUME_COLUMN].cumsum()
            self.data[self.VWAP_COLUMN] = cum_vol_price / cum_volume

            print("Feature engineering completed successfully.")
        else:
            print("Data not loaded. Please load and clean the data before feature engineering.")

## Model Selection and Tuning

In [None]:
def split_data(self):
        X = self.data[[self.OPEN_COLUMN, self.HIGH_COLUMN, self.LOW_COLUMN, self.VOLUME_10DAY_AVG, self.ADJ_CLOSE_10DAY_AVG]]
        y = self.data[self.ADJ_CLOSE_COLUMN]
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [None]:
def select_and_tune_model(self, cv=5):
        scaler = StandardScaler()
        best_score = float('inf')
        best_model_name = None
        best_model = None

        for name, model in self.models.items():
            print(f"Processing {name}...")
            pipeline = Pipeline([('scaler', scaler), ('model', model)])
            if name in self.param_grid:
                # Set up GridSearchCV
                adjusted_param_grid = {'model__' + key: value for key, value in self.param_grid[name].items()}
                grid_search = GridSearchCV(model, self.param_grid[name], cv=cv, scoring='neg_mean_squared_error')
                grid_search.fit(self.X_train, self.y_train)
                print(f"Best parameters for {name}: {grid_search.best_params_}")
                print(f"Best cross-validation score (MSE) for {name}: {-grid_search.best_score_}")

                if -grid_search.best_score_ < best_score:
                    best_score = -grid_search.best_score_
                    best_model_name = name
                    best_model = grid_search.best_estimator_
            else:
                # For models without a parameter grid, fit the default model and perform cross-validation
                scores = cross_val_score(model, self.X_train, self.y_train, cv=cv, scoring='neg_mean_squared_error')
                avg_score = -scores.mean()
                print(f"Average cross-validation score (MSE) for {name}: {avg_score}")

                if avg_score < best_score:
                    best_score = avg_score
                    best_model_name = name
                    model.fit(self.X_train, self.y_train)
                    best_model = model

        self.best_model_name = best_model_name
        self.best_model = best_model
        self.best_score = best_score

        print(f"Best model: {self.best_model_name} with MSE: {self.best_score}")
        self.predictions = self.best_model.predict(self.X_test)

        # Initialize a column for predictions in the dataset and fill it for the test set
        self.data[self.PREDICTIONS_COLUMN] = np.nan
        self.data.loc[self.X_test.index, self.PREDICTIONS_COLUMN] = self.predictions

        # Evaluate the best model using common regression metrics
        self.mse = mean_squared_error(self.y_test, self.predictions)
        self.mae = mean_absolute_error(self.y_test, self.predictions)
        self.r2 = r2_score(self.y_test, self.predictions)

        print(f"Evaluation Metrics for {self.best_model_name}: MSE={self.mse}, MAE={self.mae}, R2={self.r2}")

In [None]:
def perform_cross_validation(self, cv=5):
        # Perform cross-validation to assess the model's performance
        try:
            # Ensure the model has been fitted
            if not hasattr(self.best_model, 'fit'):
                raise ValueError("Model must be fitted before performing cross-validation.")

            cv_scores = cross_val_score(self.best_model, self.X_train, self.y_train, cv=cv, scoring='neg_mean_squared_error')
            self.cv_mse_scores = -cv_scores  # Convert scores to positive values
            self.avg_cv_mse = np.mean(self.cv_mse_scores)  # Calculate average MSE
            print(f"Cross-validation performed with {cv} folds. Average MSE: {self.avg_cv_mse:.2f}")
        except ValueError as ve:
            print(f"Value Error: {ve}")
        except NotFittedError:
            print("Model is not fitted. Fit the model before performing cross-validation.")
        except Exception as e:
            print(f"Unexpected error during cross-validation: {e}")

## Machine Learning Evaluation

In [None]:
def visualize_predictions(self):
        # Visualize the actual vs. predicted values
        plt.figure(figsize=(10, 6))
        plt.scatter(self.y_test, self.predictions, alpha=0.5)
        plt.plot([self.y_test.min(), self.y_test.max()], [self.y_test.min(), self.y_test.max()], 'k--', lw=2)
        plt.xlabel('Actual')
        plt.ylabel('Predicted')
        plt.title('Actual vs. Predicted Values')
        plt.show()

In [None]:
def feature_importance(self):
        # Visualize the importance of features used in the linear regression model
        importance = self.best_model.coef_
        features = [self.OPEN_COLUMN, self.HIGH_COLUMN, self.LOW_COLUMN, self.VOLUME_10DAY_AVG, self.ADJ_CLOSE_10DAY_AVG]
        plt.figure(figsize=(10, 6))
        plt.bar(features, importance)
        plt.xlabel('Feature')
        plt.ylabel('Coefficient Value')
        plt.title('Feature Importance')
        plt.xticks(rotation=45)
        plt.show()

In [None]:
def calculate_and_store_residuals(self):
        # Calculate and store residuals between actual closing prices and predictions
        if self.PREDICTIONS_COLUMN in self.data.columns and self.CLOSE_LAST_COLUMN in self.data.columns:
            self.data[self.RESIDUALS_COLUMN] = self.data[self.CLOSE_LAST_COLUMN] - self.data[self.PREDICTIONS_COLUMN]
            print("Residuals calculated and stored.")
        else:
            missing_columns = [col for col in [self.PREDICTIONS_COLUMN, self.CLOSE_LAST_COLUMN] if col not in self.data.columns]
            print(f"Required columns {missing_columns} are missing in the DataFrame.")

## Returning Clean Data

In [None]:
def get_clean_data(self):
        # Return the cleaned and preprocessed data
        if self.data is not None:
            return self.data
        else:
            print("Data is not ready. Please ensure data is loaded, cleaned, and preprocessed.")
            return None

## Main Function

In [None]:
# Example usage of the SP500DataHandler class
if __name__ == "__main__":
    file_path = 'SPX_2.csv'  # Update this to the actual path of your dataset
    data_handler = SP500DataHandler(file_path)
    data_handler.load_data(start_date='1960-08-01', end_date='1985-12-31')
    data_handler.clean_data()
    data_handler.ten_day_avg()
    data_handler.calculate_daily_returns_and_volatility()
    data_handler.feature_engineering()
    data_handler.split_data()
    data_handler.select_and_tune_model()
    data_handler.perform_cross_validation()
    data_handler.visualize_predictions()
    data_handler.feature_importance()
    data_handler.calculate_and_store_residuals()
    clean_data = data_handler.get_clean_data()

    
    # Display the head of the cleaned data if available
    if clean_data is not None:
        print(clean_data.head())
    else:
        print("No data to display.")