In [1]:
import numpy as np 
import pandas as pd
import yfinance as yf
import datetime
import time
import requests
import io 


I- Extraction Data From Yahoo Finance

In [2]:
data = yf.download("^GSPC", start = "2000-01-01", end="2023-12-31")
GSPC='GSPC_data.csv'
data.to_csv(GSPC)

[*********************100%***********************]  1 of 1 completed


II- Load CSV File 

In [3]:
data_csv=pd.read_csv("C:/Users/alcat/Downloads/EPSI Toulouse/Cours ESPI/Machine Learning/TP/TP HMM/GSPC_data.csv")

III- Preprocessing

In [4]:
data.head()

Price,Adj Close,Close,High,Low,Open,Volume
Ticker,^GSPC,^GSPC,^GSPC,^GSPC,^GSPC,^GSPC
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2000-01-03 00:00:00+00:00,1455.219971,1455.219971,1478.0,1438.359985,1469.25,931800000
2000-01-04 00:00:00+00:00,1399.420044,1399.420044,1455.219971,1397.430054,1455.219971,1009000000
2000-01-05 00:00:00+00:00,1402.109985,1402.109985,1413.27002,1377.680054,1399.420044,1085500000
2000-01-06 00:00:00+00:00,1403.449951,1403.449951,1411.900024,1392.099976,1402.109985,1092300000
2000-01-07 00:00:00+00:00,1441.469971,1441.469971,1441.469971,1400.72998,1403.449951,1225200000


In [5]:
data.shape

(6037, 6)

In [6]:
train_size = int(0.8*data.shape[0])
print(train_size)

4829


In [7]:
train_data = data.iloc[0 : train_size]
test_data = data.iloc[train_size+1 :]

IV- Extracting Features

In [8]:
def augment_features(dataframe):
    fracocp = (dataframe['Close'] - dataframe['Open']) / dataframe['Open']
    frachp = (dataframe['High'] - dataframe['Open']) / dataframe['Open']
    fraclp = (dataframe['Open'] - dataframe['Low']) / dataframe['Open']

    # Transformer les tableaux 2D en vecteurs 1D avec .squeeze() ou .ravel()
    fracocp = fracocp.squeeze()  # ou .ravel()
    frachp = frachp.squeeze()
    fraclp = fraclp.squeeze()

    # Créer le DataFrame avec des colonnes 1D
    new_dataframe = pd.DataFrame({
        'delOpenClose': fracocp,
        'delHighOpen': frachp,
        'delLowOpen': fraclp
    }, index=dataframe.index)

    return new_dataframe


In [9]:
def extract_features(dataframe):
 return np.column_stack((dataframe['delOpenClose'], dataframe['delHighOpen'], dataframe['delLowOpen']))


In [31]:
# Appliquer strip() sur le premier niveau des colonnes
train_data.columns = train_data.columns.set_levels(train_data.columns.levels[0].str.strip(), level=0)
features = extract_features(augment_features(train_data))

In [None]:
features.shape

V- Hidden Markov Model with HMMlearn

In [33]:
# pip install hmmlearn
from hmmlearn.hmm import GaussianHMM

In [34]:
model = GaussianHMM(n_components= 10)

In [None]:
features_train_data = augment_features(train_data)
features_train = extract_features(features_train_data)
model.fit(features_train)

VI- Generating possible sequences

In [36]:
import itertools

test_augmented = augment_features(test_data)
fracocp = test_augmented ['delOpenClose']
frachp = test_augmented ['delHighOpen']
fraclp = test_augmented ['delLowOpen']

sample_space_fracocp = np.linspace(fracocp.min(), fracocp.max(), 50)
sample_space_fraclp = np.linspace(fraclp.min(), fraclp.max(), 10)
sample_space_frachp = np.linspace(frachp.min(), frachp.max(), 10)

possible_outcomes = np.array(list(itertools.product(sample_space_fracocp, sample_space_frachp, sample_space_fraclp)))

VII- Checking predictions

In [37]:
num_latent_days = 10 # 50
num_days_to_predict = 50 # 300

In [None]:
# Pip install tqdm
from tqdm import tqdm # visualiser la progression 

predicted_close_prices = []

for i in tqdm(range(num_days_to_predict)):
 # Calculate start and end indices
 previous_data_start_index = max(0, i-num_latent_days)
 previous_data_end_index = max (0,i)

 # Acquire test data feauture for those days 
 previous_data = extract_features(augment_features(test_data.iloc[previous_data_start_index : previous_data_end_index]))

 outcome_scores =[]

 for outcome in possible_outcomes:
  # Append each outcome one by one with replacement to see which sequence generates the highest score 
  total_data = np.row_stack ((previous_data, outcome))
  outcome_scores.append(model.score(total_data))

  # Take the most probable outcome as the one with the highest score
  most_probable_outcome = possible_outcomes[np.argmax(outcome_scores)]
  predicted_close_prices.append(test_data.iloc[i] ['Open'] * (1 + most_probable_outcome[0]))


 - Affichage de resultat

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize = (30,10), dpi=80 )
plt.rcParams.update({'font.size' : 18})

x_axis = np.array(test_data.index[ 0:num_days_to_predict], dtype ='datetime64[ms]')
plt.plot(x_axis, test_data.iloc[0:num_days_to_predict]['Close'], 'b+-', label ="Actual close prices")
plt.plot(x_axis, predicted_close_prices, 'ro-', label ="Predicted close prices")
plt.legend(prop={'size' : 20})
plt.show()


In [None]:
ae = abs(test_data.iloc[0:num_days_to_predict]['Close'] - predicted_close_prices)

plt.figure(figsize=(30,10), dpi=80)

plt.plot(x_axis, ae, 'go-', label="Error")
plt.legend(prop={'size': 20})
plt.show()

In [None]:
print("Max error observed = " + str(ae.max()))
print("Min error observed = " + str(ae.min()))
print("Mean error observed = " + str(ae.mean()))

IIX- Tweaking some hyperparameters


In [1]:
num_latent_days_values = [10, 20, 30, 40, 50, 60]
baseline_num_latent_days = 50
n_components_values = [4, 6, 8, 10, 12, 14]
baseline_n_componets = 10
num_steps_values = [10, 20, 40, 50]
baseline_num_steps = 50
num_days_to_predict = 100 # We don't need to predict as many days as befor

IX- Comparing across different values from num_components

In [None]:
mae_num_components = []
for num_component in n_components_values:
 model = GaussianHMM(n_components=num_component)
 model.fit(features_train)
 predicted_close_prices = []
 for i in tqdm(range(num_days_to_predict)):
  # Calculate start and end indices
  revious_data_start_index = max(0, i - baseline_num_latent_days)
  revious_data_end_index = max(0, i)
  # Acquire test data features for these days
  previous_data = extract_features(augment_features(test_data.iloc[previous_data_start_index:previous_data_end_index]))

  outcome_scores = []
  for outcome in possible_outcomes:
   # Append each outcome one by one with replacement to see which sequence generates the highest score
   total_data = np.row_stack((previous_data, outcome))
   outcome_scores.append(model.score(total_data))

   # Take the most probable outcome as the one with the highest score
   most_probable_outcome = possible_outcomes[np.argmax(outcome_scores)]
   predicted_close_prices.append(test_data.iloc[i]['Open'] * (1 + most_probable_outcome[0]))
   mae_num_components.append((abs(test_data.iloc[0:num_days_to_predict]['Close'] - predicted_close_prices)).mean())
   

- Affichage du resultats 

In [None]:
plt.figure(figsize=(30,10), dpi=80)

plt.plot(n_components_values, mae_num_components, 'go-', label="Error")
plt.xlabel("Number of hidden states")
plt.ylabel("MAE")
plt.legend(prop={'size': 20})
plt.show()

X- Comparing across different number of intervals for the feature variables

In [None]:
mae_num_steps = []
model = GaussianHMM(n_components=baseline_n_componets)
model.fit(features_train)

for num_step in num_steps_values:
 
 sample_space_fracocp = np.linspace(fracocp.min(), fracocp.max(), num_step)   sample_space_fraclp = np.linspace(fraclp.min(), frachp.max(), int(num_step/5))
 sample_space_frachp = np.linspace(frachp.min(), frachp.max(), int(num_step/5))
 possible_outcomes = np.array(list(itertools.product(sample_space_fracocp, sample_space_frachp, sample_space_fraclp)))
 
 predicted_close_prices = []

 for i in tqdm(range(num_days_to_predict)):

  # Calculate start and end indices

  previous_data_start_index = max(0, i - baseline_num_latent_days)
  previous_data_end_index = max(0, i)

  # Acquire test data features for these days
  previous_data = extract_features(augment_features(test_data.iloc[previous_data_start_index:previous_data_end_index]))

  outcome_scores = []

  for outcome in possible_outcomes:

   # Append each outcome one by one with replacement to see which sequence generates the highest score 
   total_data = np.row_stack((previous_data, outcome))
   outcome_scores.append(model.score(total_data))

   # Take the most probable outcome as the one with the highest score
   most_probable_outcome = possible_outcomes[np.argmax(outcome_scores)]
   predicted_close_prices.append(test_data.iloc[i]['Open'] * (1 + most_probable_outcome[0]))
   mae_num_steps.append((abs(test_data.iloc[0:num_days_to_predict]['Close'] - predicted_close_prices)).mean())

- Resultat

In [None]:
plt.figure(figsize=(30,10), dpi=80)

plt.plot(num_steps_values, mae_num_steps, 'go-', label="Error")
plt.xlabel("Number of intervals for features")
plt.ylabel("MAE")
plt.legend(prop={'size': 20})
plt.show()