In [None]:
import sys
import pandas as pd
import torch
sys.path.append('..')
from data_feature_extraction.CoT_Dissa import extract_data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
df = extract_data(datasoursepath = '../data/GC=F_com_disagg.csv', finalextracteddatapath = '../data/GC=F_com_disagg_finalextracted.csv', nCorrTop=100, nMICTop= 70)

In [None]:
string_cols = df.select_dtypes(include=['object']).columns.tolist()
df.set_index('date', drop=True)
print(df.columns)
# Drop the string columns from the DataFrame
df_ = df.drop(columns=string_cols, inplace=False)
df_['date'] = pd.to_datetime(df['date'])
df_.set_index(['date'],drop=True, inplace=True)
df_.columns

In [None]:
from data_feature_selection.cot_dissag import select_feature

df_selected = select_feature(df, 0.2, 48)

Unifying

In [None]:

train_size = int(0.8 * len(df_selected))  # Use first 80% of series as training data
test_size = len(df_selected) - train_size  # Use the remainder for testing
# Apply the MinMaxScaler to the df_selected
train_df = df_selected.iloc[:train_size]
test_df = df_selected
# print(df_scaled.describe())

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Initialize a MinMaxScaler
scaler = MinMaxScaler()

def create_train_dataset(scaler: MinMaxScaler, df: pd.DataFrame, l, pr):
    """Create dataset just for model training. It should be large in whole dataset.
    It will firstly 
    """
    df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns, index=df.index)
    X, Y = [], []
    for i in range(len(df)-l-pr+1):
        X.append(df.iloc[i:i+l].values)  # Get the values for l days
        Y.append(df.iloc[i+l:i+l+pr]['Close'].values)  # Get the closing price for the 16th day
    return np.array(X), np.array(Y)

def create_test_dataset(scaler: MinMaxScaler, df: pd.DataFrame, l, pr):
    """create test dataset. The set can have the some lookback period and future date, 
    and for sure over whole dataset, but the unifing data should not contain future data. May be need to calculate rolling minmax for each input:
    it should keep moving and calculate all using."""
    df = pd.DataFrame(scaler.fit_transform(df), columns=df.columns, index=df.index)
    X, Y = [], []
    for i in range(len(df)-l-pr+1):
        X.append(df.iloc[i:i+l].values)  # Get the values for l days
        Y.append(df.iloc[i+l:i+l+pr]['Close'].values)  # Get the closing price for the 16th day
    return np.array(X), np.array(Y)




In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader

batch_size = 64
# Split the data into training and test sets


# Create the dataset
l = 84
pr = 1

# Create the dataset
train_X, train_Y = create_train_dataset(scaler,train_df, l, pr)
test_X, test_Y = create_test_dataset(scaler,test_df, l, pr)
 
# Create TensorDatasets
train_data = TensorDataset(torch.from_numpy(train_X).to(torch.float32), torch.from_numpy(train_Y).to(torch.float32))
test_data = TensorDataset(torch.from_numpy(test_X).to(torch.float32), torch.from_numpy(test_Y).to(torch.float32))
# Create DataLoaders
train_loader = DataLoader(train_data, shuffle=False, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size)



In [None]:
import torch.nn as nn

# Define LSTM network
class StockPredictor(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super().__init__()
        self.hidden_size = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, num_layers, batch_first=True)
        
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, device=x.device).requires_grad_()
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, device=x.device).requires_grad_()
        out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
        out = self.fc(out[:, -1, :]) 
        return out


Train

In [None]:
import torch.optim as optim

model = StockPredictor(input_dim = train_df.shape[1], hidden_dim = 1, output_dim = 1, num_layers = 1)
model = model.to(device)

criterion = torch.nn.MSELoss(reduction='mean')
optimiser = torch.optim.Adam(model.parameters(), lr=0.0005)

num_epochs = 120

hist = np.zeros(num_epochs)

for t in range(num_epochs):
    for seq, labels in train_loader:
        seq, labels = seq.to(device), labels.to(device)
        y_train_pred = model(seq.to(device))
        loss = criterion(y_train_pred, labels)
        print("Epoch ", t, "MSE: ", loss.item())
        hist[t] = loss.item()
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()


Test

In [None]:
model.eval()
test_losses = []

for seq, labels in test_loader:
    seq, labels = seq.to(device), labels.to(device)
    with torch.no_grad():
        y_test_pred = model(seq)
        test_loss = criterion(y_test_pred, labels)
        test_losses.append(test_loss.item())
        print("Test MSE: ", np.mean(test_losses))


Try forcesting using rise and fall

In [None]:
import matplotlib.pyplot as plt

# Collect training predictions and true labels
model.eval()

train_predictions = []
train_true_labels = []

with torch.no_grad():
    for seq, labels in train_loader:
        seq, labels = seq.to(device), labels.to(device)
        y_train_pred = model(seq)
        train_predictions.append(y_train_pred.detach().cpu().numpy())
        train_true_labels.append(labels.cpu().numpy())

# Collect test predictions and true labels
test_predictions = []
test_true_labels = []

with torch.no_grad():
    for seq, labels in test_loader:
        seq, labels = seq.to(device), labels.to(device)
        y_test_pred = model(seq)
        test_predictions.append(y_test_pred.detach().cpu().numpy())
        test_true_labels.append(labels.cpu().numpy())

# Flatten lists of arrays
train_predictions = [item for sublist in train_predictions for item in sublist]
train_true_labels = [item for sublist in train_true_labels for item in sublist]

test_predictions = [item for sublist in test_predictions for item in sublist]
test_true_labels = [item for sublist in test_true_labels for item in sublist]

plt.figure(figsize=(14,8))
plt.plot(train_true_labels, label='Train True values')
plt.plot(train_predictions, label='Train Predicted values')
plt.title('Training Data: Stock Closing Price Predictions vs True Values')
plt.xlabel('Time')
plt.ylabel('Normalized Price')
plt.legend()
plt.show()

# Plot true labels and predictions for test data
plt.figure(figsize=(14,8))
plt.plot(test_true_labels, label='Test True values')
plt.plot(test_predictions, label='Test Predicted values')
plt.title('Test Data: gold Closing Price Predictions vs True Values')
plt.xlabel('Time')
plt.ylabel('Normalized Price')
plt.legend()
plt.show()


Accuracy

In [None]:
# diff
train_true_labels = pd.Series(train_true_labels)
test_true_labels = pd.Series(test_true_labels)
train_predictions = pd.Series(train_predictions)
test_predictions = pd.Series(test_predictions)

train_true_changes, train_predicted_changes = np.sign(np.diff(train_true_labels)), np.sign(np.diff(train_predictions))
test_true_changes, test_predicted_changes = np.sign(np.diff(test_true_labels)), np.sign(np.diff(test_predictions))

print('train_true_changes:', type(train_true_labels))
#accuracy
print(train_true_changes == train_predicted_changes)
train_accuracy = np.sum(train_true_changes == train_predicted_changes) / len(train_true_labels)
test_accuracy = np.sum(test_true_changes == test_predicted_changes) / len(test_true_labels)

print(f"train accuracy:  {train_accuracy * 100:.2f}%")
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

# Plot true labels and predictions for training data

