In [2]:
from read_data import getData
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import os
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler



In [3]:
traindata, testData = getData("data")
traindata.shape, testData.shape

Training Data's shape is (10000, 50, 110, 6) and Test Data's is (10000, 50, 110, 6)


((10000, 50, 110, 6), (2100, 50, 50, 6))

In [4]:
def createDataset(data, windowSize = 40):
    X = []
    Y = []
    for i in range(110 - windowSize):
    # print(data[:, :, i:i+windowSize, :].shape, data[:, 0, i+windowSize, :2].shape)
        X.append(data[:, :, i:i+windowSize, :])
        Y.append(data[:, 0, i+windowSize, :2])
    return np.array(X), np.array(Y)


X, Y = createDataset(traindata, 40)


In [5]:
X.shape, Y.shape

((70, 10000, 50, 40, 6), (70, 10000, 2))

In [6]:
X[0].shape, Y[0].shape

((10000, 50, 40, 6), (10000, 2))

In [7]:
device = torch.device("cpu")

class SmallNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm = nn.LSTM(input_size = 6, hidden_size = 64, num_layers = 5, batch_first = True)
        self.batch_norm1 = nn.BatchNorm1d(64)
        self.pool = nn.AvgPool1d(kernel_size=4, stride=4)
        self.linear1 = nn.Linear(64, 32)
        self.linear2 = nn.Linear(32, 16)
        self.linear3 = nn.Linear(16, 2)

    def forward(self, x):
        x = x.view(x.size(0), -1, x.size(-1))

        x, temp = self.lstm(x)
        x = x.permute(0, 2, 1)  # Change shape to [batch, 64, 40] for pooling
       

        x = self.pool(x)  # After pooling: [batch, 64, 10]
        x = self.batch_norm1(x)
        x = x.permute(0, 2, 1)  # Revert to [batch, 10, 64]
        x = self.linear1(x)
        x = torch.nn.functional.leaky_relu(x, negative_slope=0.01)
        x = self.linear2(x)
        x = torch.nn.functional.leaky_relu(x, negative_slope=0.01)
        x = self.linear3(x)
        return x

model = SmallNetwork()
# model.to(device)

# test = torch.randn(2, 2, 2, 6)
# out = model(test)
# print(out.shape)

In [8]:
model.to(device)
X_tensor = torch.tensor(X, dtype = torch.float32).to(device)
y_tensor = torch.tensor(Y, dtype = torch.float32).to(device)

print(X_tensor.shape, y_tensor.shape)

trainDataset = TensorDataset(X_tensor, y_tensor)
trainDataLoader = DataLoader(trainDataset, batch_size = 256, shuffle = True)

epochs = 10
lossFn = nn.MSELoss()
optimizer = optim.Adam(params = model.parameters(), lr = 1e-4)

for each_epoch in range(epochs):
    model.train()
    runningLoss = 0.0
    loop = tqdm(trainDataLoader, desc=f"Epoch [{each_epoch+1}/{epochs}]")

    for batchX, batchY in loop:
        batchX, batchY = batchX.to(device), batchY.to(device)
        output = model(batchX)
        # print(output.shape, batchY.shape)
        # break
        loss = lossFn(output, batchY)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        runningLoss += loss.item()
        # break
    avgLoss = runningLoss/len(trainDataLoader)

    if each_epoch % 5 == 0:
        torch.save(model, f'./models/small_model_{each_epoch}_.pth')

    # if each_epoch % 2 == 0:
    print(f" For epoch {each_epoch + 1}, training loss is {avgLoss}")


: 