In [1]:
import sys
sys.path.insert(0, '/content/drive/MyDrive/Colab Notebooks/Stock_Market_Prediction')
from config import *

import yfinance as yf
import pandas as pd

### **Get historical data for S&P 500**

In [2]:
def fetch_stock_data(symbol='^GSPC', end='2024-12-31'):
   """Fetch all available historical data until end date"""
   df = yf.Ticker(symbol).history(period='max', end=end)
   df.to_csv(f"{RAW_DATA_PATH}/{symbol}_data.csv")
   print(f"Data saved to {RAW_DATA_PATH}/{symbol}_data.csv")
   print(f"Date range: {df.index[0]} to {df.index[-1]}")
   print(f"Shape: {df.shape}")
   return df

# Fetch data
raw_data = fetch_stock_data()

Data saved to /content/drive/MyDrive/Colab Notebooks/Stock_Market_Prediction/models/lstm/data/raw/^GSPC_data.csv
Date range: 1927-12-30 00:00:00-05:00 to 2024-12-30 00:00:00-05:00
Shape: (24366, 7)


### **Load saved raw data**

In [3]:
from bokeh.plotting import figure, show, output_notebook
output_notebook()

def plot_timeseries(df, title='S&P 500 Historical Prices', x_col='Date', y_col='Close'):
   output_notebook()

   p = figure(width=800, height=400, x_axis_type='datetime', title=title)
   p.line(df.index, df[y_col], line_width=2)

   p.xaxis.axis_label = x_col
   p.yaxis.axis_label = y_col
   p.grid.grid_line_alpha = 0.3

   show(p)

In [81]:
raw_data = pd.read_csv(f"{RAW_DATA_PATH}/^GSPC_data.csv", index_col='Date', parse_dates=True)
print(f"Loaded data shape: {raw_data.shape}\nDate range: {raw_data.index[0]} to {raw_data.index[-1]}")
print(raw_data.tail())

df = raw_data[['Close']].copy()

Loaded data shape: (24366, 7)
Date range: 1927-12-30 00:00:00-05:00 to 2024-12-30 00:00:00-05:00
                                  Open         High          Low        Close  \
Date                                                                            
2024-12-23 00:00:00-05:00  5940.250000  5978.250000  5902.569824  5974.069824   
2024-12-24 00:00:00-05:00  5984.629883  6040.100098  5981.439941  6040.040039   
2024-12-26 00:00:00-05:00  6024.970215  6049.750000  6007.370117  6037.589844   
2024-12-27 00:00:00-05:00  6006.169922  6006.169922  5932.950195  5970.839844   
2024-12-30 00:00:00-05:00  5920.669922  5940.790039  5869.160156  5906.939941   

                               Volume  Dividends  Stock Splits  
Date                                                            
2024-12-23 00:00:00-05:00  3593280000        0.0           0.0  
2024-12-24 00:00:00-05:00  1757720000        0.0           0.0  
2024-12-26 00:00:00-05:00  2904530000        0.0           0.0  
2024-12-27

##**Raw S&P500 data chart**

In [5]:
plot_timeseries(df)

## **Check data types**

In [6]:
raw_data.dtypes

Unnamed: 0,0
Open,float64
High,float64
Low,float64
Close,float64
Volume,int64
Dividends,float64
Stock Splits,float64


##**Function to split data into train,validation and test**

In [7]:
def split_data(df, train_size=0.7, val_size=0.15):
    n = len(df)
    train_end = int(n * train_size)
    val_end = int(n * (train_size + val_size))

    train = df[:train_end]
    val = df[train_end:val_end]
    test = df[val_end:]

    return train, val, test

## **Normalize data using MinMaxScaler**

In [45]:
from sklearn.preprocessing import MinMaxScaler
import joblib

def process_data(df):
    # 1. Split first
    train, val, test = split_data(df)

    # 2. Create target for each split
    for split in [train, val, test]:
        split['Target'] = split['Close'].shift(-1)
        split.dropna(inplace=True)

    # 3. Fit scaler on train data only
    scaler = MinMaxScaler()
    columns = ['Close', 'Target']
    scaler.fit(train[columns])

    # 4. Scale all sets
    train_scaled = pd.DataFrame(
        scaler.transform(train[columns]),
        columns=columns,
        index=train.index
    )

    val_scaled = pd.DataFrame(
        scaler.transform(val[columns]),
        columns=columns,
        index=val.index
    )

    test_scaled = pd.DataFrame(
        scaler.transform(test[columns]),
        columns=columns,
        index=test.index
    )

    train_scaled.to_csv(f"{PROCESSED_DATA_PATH}/train/train_simple_splitfirst.csv")
    val_scaled.to_csv(f"{PROCESSED_DATA_PATH}/val/val_simple_splitfirst.csv")
    test_scaled.to_csv(f"{PROCESSED_DATA_PATH}/test/test_simple_splitfirst.csv")
    joblib.dump(scaler, f"{SCALERS_PATH}/scaler_simple.joblib")

    return train_scaled, val_scaled, test_scaled, scaler

In [108]:
train_scaled, val_scaled, test_scaled,scaler=process_data(df)
plot_scaled_bokeh(train_scaled, val_scaled, test_scaled)
save_scaled_png(train_scaled, val_scaled, test_scaled, save_path=f"{FIGURES_PATH}/data", file_name="split_normalize_data.png")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  split['Target'] = split['Close'].shift(-1)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  split.dropna(inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  split['Target'] = split['Close'].shift(-1)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable

Plot saved as PNG to: /content/drive/MyDrive/Colab Notebooks/Stock_Market_Prediction/models/lstm/results/figures/data/split_normalize_data.png


In [109]:
train_scaled, val_scaled, test_scaled,scaler=process_data_scale_first(df)
plot_scaled_bokeh(train_scaled, val_scaled, test_scaled)
save_scaled_png(train_scaled, val_scaled, test_scaled, save_path=f"{FIGURES_PATH}/data", file_name="normalize_split_data.png")

Plot saved as PNG to: /content/drive/MyDrive/Colab Notebooks/Stock_Market_Prediction/models/lstm/results/figures/data/normalize_split_data.png


## **Plot simple normalized data sets**

In [53]:
from bokeh.plotting import figure, show, save
from bokeh.io import output_notebook, output_file, reset_output
from bokeh.models import ColumnDataSource, HoverTool
from bokeh.palettes import Category10

def plot_scaled_bokeh(train_scaled, val_scaled, test_scaled, col_name="Close", title="Scaled Values"):
    # Reset Bokeh
    reset_output()
    output_notebook()

    # Prepare data
    train = train_scaled.copy().reset_index()
    val = val_scaled.copy().reset_index()
    test = test_scaled.copy().reset_index()

    # Create sources
    train_source = ColumnDataSource(train)
    val_source = ColumnDataSource(val)
    test_source = ColumnDataSource(test)

    # Create figure
    p = figure(width=1000, height=500, title=title)
    p.title.text_font_size = '14pt'

    # Add lines
    p.line('Date', col_name, line_color=Category10[3][0],
           line_width=2, source=train_source, legend_label='Train')
    p.line('Date', col_name, line_color=Category10[3][1],
           line_width=2, source=val_source, legend_label='Validation')
    p.line('Date', col_name, line_color=Category10[3][2],
           line_width=2, source=test_source, legend_label='Test')

    # Configure axes
    p.xaxis.axis_label = 'Date'
    p.yaxis.axis_label = col_name
    p.xaxis.axis_label_text_font_size = '12pt'
    p.yaxis.axis_label_text_font_size = '12pt'

    # Add hover tool
    hover = HoverTool(tooltips=[
        ('Date', '@Date{%F}'),
        (col_name, '@Close{0.0000}'),
        ('Target', '@Target{0.0000}')
    ], formatters={'@Date': 'datetime'})
    p.add_tools(hover)

    # Configure legend
    p.legend.click_policy = "hide"
    p.legend.location = "top_left"
    p.legend.label_text_font_size = '10pt'

    # Show plot
    show(p)

## **Save figure**

In [25]:
import matplotlib.pyplot as plt
import os

def save_scaled_png(train_scaled, val_scaled, test_scaled, save_path,file_name, title="Scaled Values"):
    # Ensure save path exists
    os.makedirs(save_path, exist_ok=True)

    # Create figure
    plt.figure(figsize=(15, 8))

    # Plot each dataset
    plt.plot(train_scaled.index, train_scaled['Close'], label='Train', linewidth=2)
    plt.plot(val_scaled.index, val_scaled['Close'], label='Validation', linewidth=2)
    plt.plot(test_scaled.index, test_scaled['Close'], label='Test', linewidth=2)

    # Configure plot
    plt.title(title, pad=20, fontsize=14)
    plt.xlabel('Date', fontsize=12)
    plt.ylabel('Scaled Value', fontsize=12)
    plt.legend(fontsize=10)
    plt.grid(True, alpha=0.3)

    # Rotate x-axis labels
    plt.xticks(rotation=45)

    # Adjust layout
    plt.tight_layout()

    # Full file path
    file_path = os.path.join(save_path, file_name)

    # Save figure
    plt.savefig(file_path, dpi=300, bbox_inches='tight')
    plt.close()

    print(f"Plot saved as PNG to: {file_path}")

## **Function to inverz transform normalized data back to it's real price**

In [14]:
import joblib

scaler = joblib.load(os.path.join(SCALERS_PATH, 'scaler_simple.joblib'))

def inverse_transform(scaled_data, scaler):
   columns = ['Close', 'Target']
   real_values = pd.DataFrame(
       scaler.inverse_transform(scaled_data[columns]),
       columns=columns,
       index=scaled_data.index
   )
   return real_values

train_real = inverse_transform(train_scaled, scaler)
val_real = inverse_transform(val_scaled, scaler)
test_real = inverse_transform(test_scaled, scaler)
# plot_splits(train_real, val_real, test_real,title='Stock Price Data Splits - Real')

## **Scale first, then split**

In [50]:
import pandas as pd
import joblib
from sklearn.preprocessing import MinMaxScaler

def process_data_scale_first(df):
    # 1. Create Target column before splitting
    df['Target'] = df['Close'].shift(-1)
    df.dropna(inplace=True)

    # 2. Fit scaler on the full dataset before splitting
    scaler = MinMaxScaler()
    columns = ['Close', 'Target']
    scaler.fit(df[columns])

    # 3. Scale the entire dataset
    df_scaled = pd.DataFrame(
        scaler.transform(df[columns]),
        columns=columns,
        index=df.index
    )

    # 4. Now split the scaled data
    train, val, test = split_data(df_scaled)

    # 5. Save the processed data
    train.to_csv(f"{PROCESSED_DATA_PATH}/train/train_simple_splitlast.csv")
    val.to_csv(f"{PROCESSED_DATA_PATH}/val/val_simple_splitlast.csv")
    test.to_csv(f"{PROCESSED_DATA_PATH}/test/test_simple_splitlast.csv")
    joblib.dump(scaler, f"{SCALERS_PATH}/scaler_simple_splitlast.joblib")

    return train, val, test, scaler

## **Custom Data Normalization**

In [38]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import os

def subtract_first_value(df, column_name='Close'):
    df = df.copy()
    first_value = df[column_name].iloc[0]
    df[column_name] = df[column_name] - first_value
    return df

def custom_normalize(data, index, n):
    A_prime = data
    numerator = (A_prime - (A_prime / n) * index) * index
    denominator = np.sqrt(index**2 + ((A_prime / n) * index)**2)
    return 0 if denominator == 0 else numerator / denominator

def normalize(df, column_name='Close'):
    df = df.copy().reset_index(drop=True)
    n = len(df) - 1

    df['Custom_Normalized'] = [
        custom_normalize(row[column_name], i, n)
        for i, row in df.iterrows()
    ]
    return df

## **Split first then apply the custom normalization method**

In [106]:
def process_data_custom(df, save_path, feature_range=(0, 1)):
    # 1. First split the data
    train, val, test = split_data(df)

    # 2. Subtract first value from each split
    train = subtract_first_value(train)
    val = subtract_first_value(val)
    test = subtract_first_value(test)

    # 3. Apply custom normalization to each split
    train_norm = normalize(train)
    val_norm = normalize(val)
    test_norm = normalize(test)

    # 4. Fit scaler on training data only
    scaler = MinMaxScaler(feature_range=feature_range)
    scaler.fit(train_norm[['Custom_Normalized']])

    # 5. Transform all splits using the scaler fit on training data
    train_norm['Scaled'] = scaler.transform(train_norm[['Custom_Normalized']])
    val_norm['Scaled'] = scaler.transform(val_norm[['Custom_Normalized']])
    test_norm['Scaled'] = scaler.transform(test_norm[['Custom_Normalized']])

    # 6. Calculate targets
    train_norm['Target'] = train_norm['Scaled'].shift(-1)
    val_norm['Target'] = val_norm['Scaled'].shift(-1)
    test_norm['Target'] = test_norm['Scaled'].shift(-1)

    # 7. Remove NaN values
    train_norm.dropna(inplace=True)
    val_norm.dropna(inplace=True)
    test_norm.dropna(inplace=True)

    # 8. Save processed data
    os.makedirs(save_path, exist_ok=True)
    train_norm.to_csv(f"{save_path}/train/train_custom_splitfirst.csv")
    val_norm.to_csv(f"{save_path}/val/val_custom_splitfirst.csv")
    test_norm.to_csv(f"{save_path}/test/test_custom_splitfirst.csv")
    joblib.dump(scaler, f"{SCALERS_PATH}/scaler_custom.joblib")

    return train_norm, val_norm, test_norm, scaler


In [107]:
train_norm, val_norm, test_norm,scaler = process_data_custom(df, save_path=PROCESSED_DATA_PATH)
display_split_info(train_norm, val_norm, test_norm)
plot_custom_normalized_bokeh_continuous(train_norm, val_norm, test_norm,save_path=f"{FIGURES_PATH}/data", file_name="custom_normalized_split_first.png")


Data Split Information:
--------------------------------------------------
Total samples: 24363
Train samples: 17055 (70.00%)
Val samples: 3654 (15.00%)
Test samples: 3654 (15.00%)

Train Data Tail:
--------------------------------------------------
            Close  Custom_Normalized    Scaled    Target
17050  602.519993           0.176530  0.243330  0.242628
17051  598.509983           0.140285  0.242628  0.241954
17052  599.819981           0.105444  0.241954  0.241279
17053  601.860020           0.070535  0.241279  0.240596
17054  601.120029           0.035224  0.240596  0.239914

Validation Data Tail:
--------------------------------------------------
           Close  Custom_Normalized    Scaled    Target
3649  498.309998           0.675616  0.252986  0.250359
3650  497.690002           0.539832  0.250359  0.247770
3651  499.120056           0.406016  0.247770  0.245166
3652  500.590027           0.271460  0.245166  0.242518
3653  496.279968           0.134583  0.242518  0.2399

In [105]:
train_norm, val_norm, test_norm,scaler = process_data_split_end(df, save_path=PROCESSED_DATA_PATH)
display_split_info(train_norm, val_norm, test_norm)
plot_custom_normalized_bokeh_continuous(train_norm, val_norm, test_norm,save_path=f"{FIGURES_PATH}/data", file_name="custom_normalized_split_at_the_end.png")


Data Split Information:
--------------------------------------------------
Total samples: 24365
Train samples: 17055 (70.00%)
Val samples: 3655 (15.00%)
Test samples: 3655 (15.00%)

Train Data Tail:
--------------------------------------------------
            Close  Custom_Normalized    Scaled    Target
17050  602.519993         180.836723  0.486167  0.483083
17051  598.509983         179.609354  0.483083  0.484008
17052  599.819981         179.977628  0.484008  0.485483
17053  601.860020         180.564680  0.485483  0.484864
17054  601.120029         180.318146  0.484864  0.486994

Validation Data Tail:
--------------------------------------------------
             Close  Custom_Normalized    Scaled    Target
20705  1097.569981         164.704966  0.445635  0.445289
20706  1096.949986         164.567140  0.445289  0.445714
20707  1098.380039         164.736211  0.445714  0.446153
20708  1099.850010         164.911136  0.446153  0.444420
20709  1095.539951         164.221275  0.44

In [104]:
from bokeh.plotting import figure, show
from bokeh.io import output_notebook, reset_output,export_png
from bokeh.models import ColumnDataSource, HoverTool
from bokeh.palettes import Category10
import pandas as pd

def plot_custom_normalized_bokeh_continuous(train_scaled, val_scaled, test_scaled,save_path,file_name, column_name="Scaled", title="Custom Normalized Data"):
    reset_output()
    output_notebook()

    # Ensure train, val, and test have continuous indices
    train_scaled = train_scaled.copy().reset_index(drop=True)
    val_scaled = val_scaled.copy().reset_index(drop=True)
    test_scaled = test_scaled.copy().reset_index(drop=True)

    # Adjust validation index to start after train
    val_scaled["Index"] = val_scaled.index + len(train_scaled)

    # Adjust test index to start after validation (train + validation)
    test_scaled["Index"] = test_scaled.index + len(train_scaled) + len(val_scaled)

    train_scaled["Index"] = train_scaled.index

    # Create data sources
    train_source = ColumnDataSource(train_scaled)
    val_source = ColumnDataSource(val_scaled)
    test_source = ColumnDataSource(test_scaled)

    # Create Bokeh figure
    p = figure(width=1000, height=500, title=title)
    p.title.text_font_size = '14pt'

    p.line('Index', column_name, line_color=Category10[3][0], line_width=2, source=train_source, legend_label='Train')
    p.line('Index', column_name, line_color=Category10[3][1], line_width=2, source=val_source, legend_label='Validation')
    p.line('Index', column_name, line_color=Category10[3][2], line_width=2, source=test_source, legend_label='Test')

    p.xaxis.axis_label = 'Index'
    p.yaxis.axis_label = column_name
    p.xaxis.axis_label_text_font_size = '12pt'
    p.yaxis.axis_label_text_font_size = '12pt'

    hover = HoverTool(tooltips=[('Index', '@Index'), (column_name, f'@{column_name}{{0.0000}}')])
    p.add_tools(hover)

    p.legend.click_policy = "hide"
    p.legend.location = "top_left"
    p.legend.label_text_font_size = '10pt'

    # Save as PNG using Matplotlib
    plt.figure(figsize=(10, 5))
    plt.plot(train_scaled["Index"], train_scaled[column_name], label="Train", linewidth=2)
    plt.plot(val_scaled["Index"], val_scaled[column_name], label="Validation", linewidth=2)
    plt.plot(test_scaled["Index"], test_scaled[column_name], label="Test", linewidth=2)

    plt.title(title, fontsize=14)
    plt.xlabel("Index", fontsize=12)
    plt.ylabel(column_name, fontsize=12)
    plt.legend(fontsize=10)
    plt.grid(True, alpha=0.3)
    plt.xticks(rotation=45)

    # Full file path
    file_path = os.path.join(save_path, file_name)

    plt.tight_layout()
    plt.savefig(file_path, dpi=300, bbox_inches="tight")
    plt.close()

    print(f"Plot saved as PNG: {file_name} to {save_path}")

    show(p)

In [36]:
def display_split_info(train, val, test):
    print("\nData Split Information:")
    print("-" * 50)
    print(f"Total samples: {len(train) + len(val) + len(test)}")
    print(f"Train samples: {len(train)} ({len(train)/(len(train) + len(val) + len(test)):.2%})")
    print(f"Val samples: {len(val)} ({len(val)/(len(train) + len(val) + len(test)):.2%})")
    print(f"Test samples: {len(test)} ({len(test)/(len(train) + len(val) + len(test)):.2%})")

    print("\nTrain Data Tail:")
    print("-" * 50)
    print(train.tail())

    print("\nValidation Data Tail:")
    print("-" * 50)
    print(val.tail())

    print("\nTest Data Tail:")
    print("-" * 50)
    print(test.tail())

    print("\nValue Ranges:")
    print("-" * 50)
    print("Train - Scaled:", f"min: {train['Scaled'].min():.3f}, max: {train['Scaled'].max():.3f}")
    print("Val - Scaled:", f"min: {val['Scaled'].min():.3f}, max: {val['Scaled'].max():.3f}")
    print("Test - Scaled:", f"min: {test['Scaled'].min():.3f}, max: {test['Scaled'].max():.3f}")

In [89]:
import os
import pandas as pd
import joblib
from sklearn.preprocessing import MinMaxScaler

def process_data_split_end(df, save_path, feature_range=(0, 1)):
    # 1. Subtract first value from the entire dataset before splitting
    df = subtract_first_value(df)

    # 2. Apply custom normalization to the entire dataset
    df_norm = normalize(df)

    # 3. Fit scaler on the entire normalized dataset before splitting
    scaler = MinMaxScaler(feature_range=feature_range)
    scaler.fit(df_norm[['Custom_Normalized']])

    # 4. Transform the entire dataset
    df_norm['Scaled'] = scaler.transform(df_norm[['Custom_Normalized']])

    # 5. Calculate target values before splitting
    df_norm['Target'] = df_norm['Scaled'].shift(-1)

    # 6. Remove NaN values
    df_norm.dropna(inplace=True)

    # 7. Now split the processed dataset
    train_norm, val_norm, test_norm = split_data(df_norm)

    # 8. Save processed data
    os.makedirs(save_path, exist_ok=True)
    train_norm.to_csv(f"{save_path}/train/train_custom_splitlast.csv")
    val_norm.to_csv(f"{save_path}/val/val_custom_splitlast.csv")
    test_norm.to_csv(f"{save_path}/test/test_custom_splitlast.csv")
    joblib.dump(scaler, f"{SCALERS_PATH}/scaler_custom_splitlast.joblib")

    return train_norm, val_norm, test_norm, scaler