In [1]:
# %pip install numpy
# %pip install pandas
# %pip install torch
# %pip install scikit-learn

In [2]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import torch.nn.functional as F

from Fourier_Transformer import LinearTransformer, create_inout_sequences
from LSTM import LSTMModel, train_model


### Data Preparation

In [4]:
# Load dataset
df = pd.read_csv('C:/Users/19495/Documents/GitHub/Linear_Transformation/stock_data/TSLA.csv') 

df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
df.dropna(inplace=True)  # Remove NaNs
scaler = MinMaxScaler(feature_range=(-1, 1))
df['log_return'] = scaler.fit_transform(df['log_return'].values.reshape(-1,1))
data = torch.FloatTensor(df['log_return'].values).view(-1) # Convert the DataFrame to a PyTorch Tensor

seq_length = 20  # Based on how many days you want to use to predict the next day
inout_seq = create_inout_sequences(data, seq_length) # Create sequences


# Split data into train and test sets
train_size = int(len(inout_seq) * 0.80)
train_set = inout_seq[:train_size]
test_set = inout_seq[train_size:]
# Prepare DataLoader
train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=64, shuffle=False)

### Training and Test Process

Initilize the model with 2 layers, 64 dimensions, 

In [5]:
# Instantiate the model, loss function, and optimizer
model = LinearTransformer(feature_size=1, num_layers=2, d_model=64, d_ff = 2048, num_heads=8)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
train_losses = []
test_losses = []
num_epochs = 50 

for epoch in range(num_epochs):
    model.train()
    epoch_train_loss = []
    for seq, labels in train_loader:
        optimizer.zero_grad()
        y_pred = model(seq.unsqueeze(-1))  # Adjust dimensions if necessary
        labels = labels.view(-1)  # Ensure label dimensions match output
        loss = criterion(y_pred[:, -1], labels)  # Assuming using last output for prediction
        loss.backward()
        optimizer.step()
        epoch_train_loss.append(loss.item())
    
    # Calculate and store the average training loss for this epoch
    train_losses.append(np.mean(epoch_train_loss))

    # Validation or Testing phase
    model.eval()
    epoch_test_loss = []
    with torch.no_grad():
        for seq, labels in test_loader:
            y_pred = model(seq.unsqueeze(-1))
            labels = labels.view(-1)
            loss = criterion(y_pred[:, -1], labels)
            epoch_test_loss.append(loss.item())
    
    # Calculate and store the average test loss for this epoch
    test_losses.append(np.mean(epoch_test_loss))
    
    # Optional: print out loss information to monitor progress
    print(f'Epoch {epoch+1}/{num_epochs} - Training Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}')

overall_avg_train_loss = np.mean(train_losses)
overall_avg_test_loss = np.mean(test_losses)

print(f'Overall Average Training Loss: {overall_avg_train_loss:.4f}')
print(f'Overall Average Test Loss: {overall_avg_test_loss:.4f}')

# Assuming 'model' is your model instance and it has been trained
torch.save(model.state_dict(), 'transformer_fourier.pth')



  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/50 - Training Loss: 2.5865, Test Loss: 0.5822
Epoch 2/50 - Training Loss: 0.2760, Test Loss: 0.4178
Epoch 3/50 - Training Loss: 0.4330, Test Loss: 0.2681
Epoch 4/50 - Training Loss: 0.1820, Test Loss: 0.0868
Epoch 5/50 - Training Loss: 0.0897, Test Loss: 0.2206
Epoch 6/50 - Training Loss: 0.1690, Test Loss: 0.2077
Epoch 7/50 - Training Loss: 0.1255, Test Loss: 0.1119
Epoch 8/50 - Training Loss: 0.0760, Test Loss: 0.0838
Epoch 9/50 - Training Loss: 0.0814, Test Loss: 0.0907
Epoch 10/50 - Training Loss: 0.0910, Test Loss: 0.0888
Epoch 11/50 - Training Loss: 0.0797, Test Loss: 0.0839
Epoch 12/50 - Training Loss: 0.0729, Test Loss: 0.0887
Epoch 13/50 - Training Loss: 0.0715, Test Loss: 0.0984
Epoch 14/50 - Training Loss: 0.0743, Test Loss: 0.1004
Epoch 15/50 - Training Loss: 0.0737, Test Loss: 0.0958
Epoch 16/50 - Training Loss: 0.0717, Test Loss: 0.0879
Epoch 17/50 - Training Loss: 0.0701, Test Loss: 0.0856
Epoch 18/50 - Training Loss: 0.0707, Test Loss: 0.0848
Epoch 19/50 - Train

### Prediction

Load the saved model.

In [7]:
# Instantiate the model
model = LinearTransformer(feature_size=1, num_layers=2, d_model=64, num_heads=8, d_ff=2048, dropout=0.1)

# Load the weights
model.load_state_dict(torch.load('transformer_fourier.pth'))
model.eval()  # Set the model to evaluation mode


LinearTransformer(
  (fourier_transform): FourierTransform()
  (positional_encoder): Embedding(1000, 64)
  (encoder_layers): ModuleList(
    (0-1): 2 x TransformerEncoderLayer(
      (fourier_transform): FourierTransform()
      (feed_forward): PositionwiseFeedforward(
        (linear1): Linear(in_features=64, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=64, bias=True)
      )
      (layernorm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (layernorm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
  )
  (decoder_layers): ModuleList(
    (0-1): 2 x TransformerDecoderLayer(
      (self_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
      )
      (multihead_attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=64,

Using the stock data of Apple.

In [8]:
# Assuming you have new input data for prediction
df = pd.read_csv('C:/Users/19495/Documents/GitHub/Linear_Transformation/stock_data/AAPL.csv') 

df['log_return'] = np.log(df['Close'] / df['Close'].shift(1))
df.dropna(inplace=True)  # Remove NaNs
scaler = MinMaxScaler(feature_range=(-1, 1))
df['log_return'] = scaler.fit_transform(df['log_return'].values.reshape(-1,1))
new_input_data = df['log_return']  # This should be your new input data

# Scale and preprocess your new data
new_input_scaled = scaler.transform(np.array(new_input_data).reshape(-1, 1))

# Convert to Tensor
new_input_tensor = torch.FloatTensor(new_input_scaled).view(-1)

# Create sequences (same sequence length as used during training)
new_sequences = create_inout_sequences(new_input_tensor, seq_length)

# Extract just the sequence part (ignoring labels if it's purely for prediction)
new_sequences = [seq[0] for seq in new_sequences]  # Assuming you only need the sequences

# Optionally, convert to a DataLoader if dealing with many sequences
predict_loader = DataLoader(new_sequences, batch_size=64, shuffle=False)


In [9]:
predictions = []
with torch.no_grad():
    for sequences in predict_loader:
        sequences = sequences.view(sequences.shape[0], seq_length, 1)  # Reshape if necessary
        output = model(sequences)
        predicted_values = output[:, -1]  # If you're predicting the last value
        predictions.extend(predicted_values.numpy())

# Inverse transform the predictions if necessary
predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))

In [10]:
last_known_price = df['Close'].iloc[-1]
predicted_prices = [last_known_price]


predicted_price = predicted_prices[-1] * np.exp(predictions[0])
predicted_prices.append(predicted_price)

# The predicted price for the next time step after the last sequence in your test data
next_predicted_price = predicted_prices[-1]
print(f"Predicted price for the next time step: {next_predicted_price}")

Predicted price for the next time step: [169.67157]


### LSTM

In [12]:
class TimeSeriesDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, index):
        sequence, label = self.sequences[index]
        # Convert numpy arrays to torch tensors and ensure they are of type float32
        sequence_tensor = torch.tensor(sequence, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.float32)
        return sequence_tensor, label_tensor

In [13]:
# Load your dataset
data = pd.read_csv('C:/Users/19495/Documents/GitHub/Linear_Transformation/stock_data/TSLA.csv')

# Normalize data 
scaler = MinMaxScaler(feature_range=(-1, 1))
data_normalized = scaler.fit_transform(data['Close'].values.reshape(-1, 1))

# Define window size
window_size = 10

# Create sequences
inout_seq = create_inout_sequences(data_normalized, window_size)

# Split data into train and test
split_ratio = 0.8
split_index = int(len(inout_seq) * split_ratio)
train_seq = inout_seq[:split_index]
test_seq = inout_seq[split_index:]


train_dataset = TimeSeriesDataset(train_seq)
test_dataset = TimeSeriesDataset(test_seq)

train_data = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_data = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [14]:
def evaluate_model(model, test_loader):
    # Extract the device from the model's parameters
    device = next(model.parameters()).device
    model.eval()
    total_loss = 0
    criterion = nn.MSELoss()
    predictions = []
    labels = []

    with torch.no_grad():
        for data, target in test_loader:
            # Ensure data and target tensors are moved to the same device as the model
            data, target = data.to(device), target.to(device)
            output = model(data)
            predictions.append(output.cpu().numpy())
            labels.append(target.cpu().numpy())
            loss = criterion(output, target)
            total_loss += loss.item()

    average_loss = total_loss / len(test_loader)
    print(f'Average Loss: {average_loss}')

In [15]:
lstm_model = LSTMModel(input_dim=1, hidden_dim=50, num_layers=1, output_dim=1)
lstm_optimizer = torch.optim.Adam(lstm_model.parameters(), lr=0.001)
train_model(lstm_model, train_data, nn.MSELoss(), lstm_optimizer, num_epochs=10)
evaluate_model(lstm_model, test_data)

Epoch 1 Loss: 0.27838587760925293
Epoch 2 Loss: 0.2244166135787964
Epoch 3 Loss: 0.2143065482378006
Epoch 4 Loss: 0.2776557207107544
Epoch 5 Loss: 0.27410751581192017
Epoch 6 Loss: 0.24769581854343414
Epoch 7 Loss: 0.20124225318431854
Epoch 8 Loss: 0.22182869911193848
Epoch 9 Loss: 0.22796595096588135
Epoch 10 Loss: 0.18641243875026703
Average Loss: 0.5255939960479736


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)
