# Libraries

In [46]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import sklearn as sklearn
from sklearn.preprocessing import StandardScaler

# parameters
input_channels = 5 # open, high, low, close, volume
activation_function = nn.LeakyReLU()
learning_rate = 0.001
epochs = 1000
loss_function = nn.MSELoss()
window_size = 15 # 15 day window
scaler = StandardScaler()

# loading in the dataset

In [47]:
df = pd.read_csv("data.csv")
df = df[df["Company"] == "AAPL"] # select Apple's stock price data

open_data = list()
close_data = list()
high_data = list()
low_data = list()
volume_data = list()

# fixing dataset format

for i in range(len(df)):
  open_data.append(float(df["Open"][i][1:]))
  close_data.append(float(df["Close/Last"][i][1:]))
  high_data.append(float(df["High"][i][1:]))
  low_data.append(float(df.Low[i][1:]))
  volume_data.append(float(df.Volume[i]))

open_data.reverse()
high_data.reverse()
low_data.reverse()
close_data.reverse()
volume_data.reverse()
target = []

for i in range(len(close_data) - window_size - 1):
  percent_change = close_data[i + window_size + 1] / close_data[i + window_size] - 1
  target.append(percent_change)

print(target)
print(close_data[window_size:])

[-0.01422445732055444, 0.02840348974763418, 0.04752147812646035, 0.018244626700067323, -0.0011851535644477051, 0.008879516378462071, 0.010768990657956445, -0.013135836237702536, 0.0025704929758487705, 0.0011983457255286822, -0.003857951667047166, 0.003889658872446189, -0.028591787654760892, 0.004722171281863252, 0.0016370066164728136, -0.009116948640999545, 0.0027987540659517585, 0.02069423988354835, -0.006861159771818892, 0.005958774776404718, 0.01595517489434406, -0.02277969852025996, -0.05444448847201078, 0.010650207436586623, -0.016479285384259912, -0.03179430966778285, 0.011551596509016493, 0.020557885544910226, 0.016401839030592225, -0.010356950183484481, 0.0497028730261424, -0.003138749864463053, -0.015479822989597936, 0.009739787759848628, -0.00713504175064783, -0.012429601359541853, 0.023509996006295886, 0.00328224432062707, -0.012565558815623157, -0.0007819468739501678, 0.009767493087397217, -0.013961284989322253, 0.011742856477492403, 0.00627226221508681, 0.00647906170834500

In [48]:
data = []

# first we fit our scaler to our mean and standard deviation
scaled_data = scaler.fit_transform(
  torch.tensor([open_data, high_data, low_data, close_data, volume_data]).T
)

# this gets the closing price to predict but we don't want the first windows input data 
target = torch.tensor(scaled_data[window_size:, 3:4], dtype=torch.float)

# this selects our windows and adds them to one list
for i in range(len(open_data) - window_size):
  data.append([
    open_data[i:i + window_size],
    high_data[i:i + window_size], 
    low_data[i:i + window_size], 
    close_data[i:i + window_size], 
    volume_data[i:i + window_size]])

# in order to scale our data we have to swap the rows and columns around
data = torch.tensor(data).permute([0, 2, 1])

# this scales each window to our mean and standard deviation
for i in range(len(data)):
  data[i] = torch.tensor(scaler.transform(data[i]))

# change it back
data = data.permute([0, 2, 1])
  
train_x, test_x, train_y, test_y = sklearn.model_selection.train_test_split(data, target, random_state=7, shuffle=True)


# Building the model
The network is set up to output a single value prediction. Probably the closing price for some future date.

In [49]:
class Net(nn.Module): # the nn.Module is set up as the parent class for the Net Class
    def __init__(self):
        super(Net, self).__init__() # This calls the init method of the nn.Module parent class to ensure that its been initialized

        self.model = nn.Sequential(
            nn.Conv1d(in_channels=input_channels, out_channels=64, kernel_size=2, stride=1),
            activation_function,
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=2, stride=1),
            activation_function,
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Flatten(),
            nn.Linear(in_features=384, out_features=128),
            activation_function,
            nn.Linear(128, 64),
            activation_function,
            nn.Linear(64, 1),
        )

    def forward(self, x):

        return self.model(x)


net = Net()

# Training the model

In [50]:
def train(model: nn.Module, train_x, train_y, epochs, learning_rate):
  optimizer = optim.Adam(net.parameters(), lr=learning_rate)

  for epoch in range(epochs):
    y_hat = net(train_x)
    loss = loss_function(y_hat, train_y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if epoch % 50 == 0: # print out the loss as the model is training
      print(f"Epoch: {epoch}, Loss: {loss.item():.10f}")

train(net, train_x, train_y, epochs, learning_rate)


Epoch: 0, Loss: 1.0010013580
Epoch: 50, Loss: 0.0071831835
Epoch: 100, Loss: 0.0044226279
Epoch: 150, Loss: 0.0036448718
Epoch: 200, Loss: 0.0028492857
Epoch: 250, Loss: 0.0021682016
Epoch: 300, Loss: 0.0018919157
Epoch: 350, Loss: 0.0017575617
Epoch: 400, Loss: 0.0016664655
Epoch: 450, Loss: 0.0016381619
Epoch: 500, Loss: 0.0015345756
Epoch: 550, Loss: 0.0014802200
Epoch: 600, Loss: 0.0014705103
Epoch: 650, Loss: 0.0013979365
Epoch: 700, Loss: 0.0013429663
Epoch: 750, Loss: 0.0013598807
Epoch: 800, Loss: 0.0013697359
Epoch: 850, Loss: 0.0012830473
Epoch: 900, Loss: 0.0012218757
Epoch: 950, Loss: 0.0011750527


# Running the model

In [51]:
y_hat = net(test_x)
print(loss_function(y_hat, test_y))

price_prediction = y_hat.tolist()
# Convert price_prediction to numpy array and inverse transform only the closing price column

# Convert to numpy array and ensure shape is (n_samples, 1)
price_prediction_np = np.array(price_prediction).reshape(-1, 1)

# Create a placeholder array with the same number of features as original data
# Fill with zeros, then set the close price column (index 3) to the predicted values
full_pred = np.zeros((price_prediction_np.shape[0], scaled_data.shape[1]))
full_pred[:, 3] = price_prediction_np[:, 0]

# Inverse transform
inv_price_prediction = scaler.inverse_transform(full_pred)[:, 3]
# Inverse transform the actual test_y values
test_y_np = test_y.numpy().reshape(-1, 1)
full_actual = np.zeros((test_y_np.shape[0], scaled_data.shape[1]))
full_actual[:, 3] = test_y_np[:, 0]
inv_actual_price = scaler.inverse_transform(full_actual)[:, 3]

# Create a DataFrame to compare predictions and actual prices
comparison_df = pd.DataFrame({
    'Predicted_Close': inv_price_prediction,
    'Actual_Close': inv_actual_price
})
pd.set_option('display.max_rows', None)
print(comparison_df)

tensor(0.0020, grad_fn=<MseLossBackward0>)
     Predicted_Close  Actual_Close
0          19.554394     18.557501
1          52.503046     51.040002
2         169.682645    165.789996
3          38.090353     38.185002
4          24.544564     24.297500
5          31.708762     32.639999
6          32.539106     31.687501
7          36.130379     36.005002
8          18.494783     18.604301
9         123.887380    123.240000
10        118.729116    116.970000
11        106.821943    115.010002
12         28.583110     29.875001
13         21.157577     17.878599
14         32.244399     32.687499
15         28.441908     27.892500
16         24.640883     23.309998
17         47.561559     46.625000
18        138.156285    132.369996
19         78.590057     79.807502
20         25.912272     24.530002
21         18.783603     19.863601
22         20.121881     18.572501
23         38.957245     38.285001
24         19.035465     16.075700
25        172.870721    174.550003
26         1

# saving the model

In [None]:
# torch.save(net, "file_name.pt")