# LSTM Cell Writeup

# LSTM Cell Code

In [None]:
# Standard library
import os

# Data manipulation
import numpy as np
import pandas as pd

# Visualization
import matplotlib.pyplot as plt

# Deep learning
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, Dataset

# Preprocessing
from sklearn.preprocessing import MinMaxScaler

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Data directory
csv_dir = "data/Car_data/car_data"
print(f"Data directory: {csv_dir}")


In [None]:
# List all CSV files
csv_files = [os.path.join(csv_dir, f)
             for f in os.listdir(csv_dir) if f.endswith('.csv')]

# List all CSV files

print(f"Total files: {len(csv_files)}")

# Split into train/test sets
data_80 = int(len(csv_files) * 0.8)
trainset = csv_files[:data_80]
testset = csv_files[data_80:]
print(f"Training files: {len(trainset)}")
print(f"Testing files: {len(testset)}")
print(f"Verification: {(len(trainset) + len(testset)) == len(csv_files)}")

# Fit scaler on all training data
all_train_data = []
for file_path in trainset:
    df = pd.read_csv(file_path)
    all_train_data.append(df.values)

all_train_data = np.vstack(all_train_data)
scaler = MinMaxScaler()
scaler.fit(all_train_data)

# Lists to store sequences
all_sequences = []

# Process each file
for file_path in trainset:
    # Read CSV
    df = pd.read_csv(file_path)

    # Scale the data
    scaled_data = scaler.transform(df.values)

    # Store the sequence
    all_sequences.append(scaled_data)

# Create input/output sequences
X = []  # Input: 62 time steps, all 12 features
y = []  # Output: next 5 time steps, only x,y coordinates

for sequence in all_sequences:
    # Only use sequences with enough time steps
    if len(sequence) >= 67:
        for i in range(len(sequence) - 67 + 1):
            # 62 time steps as input, all features
            X.append(sequence[i:i+62, :])
            # next 5 time steps, only x,y coordinates
            y.append(sequence[i+62:i+67, :2])

# Convert to PyTorch tensors
X = torch.tensor(np.array(X), dtype=torch.float32)
y = torch.tensor(np.array(y), dtype=torch.float32)

# Create DataLoader
dataset = TensorDataset(X, y)
batch_size = 32
train_loader = DataLoader(dataset, batch_size=batch_size,
                          shuffle=False, pin_memory=True, num_workers=12
                          )

In [None]:
# class LSTM_custom(nn.Module):
#     def __init__(self, input_size=12, hidden_size=64, num_layers=1, sequence_length=10, dropout=0.2):

#         super(LSTM_custom, self).__init__()
#         self.input_size = input_size
#         self.hidden_size = hidden_size
#         self.num_layers = num_layers

#         # Input gate
#         self.W_xi = nn.Linear(input_size, hidden_size)
#         self.W_hi = nn.Linear(hidden_size, hidden_size)

#         # Forget gate
#         self.W_xf = nn.Linear(input_size, hidden_size)
#         self.W_hf = nn.Linear(hidden_size, hidden_size)

#         # Cell gate
#         self.W_xg = nn.Linear(input_size, hidden_size)
#         self.W_hg = nn.Linear(hidden_size, hidden_size)

#         # Output gate
#         self.W_xo = nn.Linear(input_size, hidden_size)
#         self.W_ho = nn.Linear(hidden_size, hidden_size)

#     def lstm_cell(self, x_t, h_prev, c_prev):
#         i_t = torch.sigmoid(self.W_xi(x_t) + self.W_hi(h_prev))
#         f_t = torch.sigmoid(self.W_xf(x_t) + self.W_hf(h_prev))
#         g_t = torch.tanh(self.W_xg(x_t) + self.W_hg(h_prev))
#         o_t = torch.sigmoid(self.W_xo(x_t) + self.W_ho(h_prev))

#         c_t = f_t * c_prev + i_t * g_t
#         h_t = o_t * torch.tanh(c_t)

#         return h_t, c_t

#     def forward(self, x):
#         batch_size, seq_len, _ = x.size()
#         h_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
#         c_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
        
#         # Process the input sequence once
#         for t in range(seq_len):
#             h_t, c_t = self.lstm_cell(x[:, t, :], h_t, c_t)
        
#         # Predict next 5 timesteps
#         predictions = []
#         current_x = x[:, -1, :]  # Start with the last input
        
#         for i in range(5):
#             h_t, c_t = self.lstm_cell(current_x, h_t, c_t)
#             predictions.append(h_t)
#             # You would need a projection layer to convert h_t to the right dimension
#             # current_x = projection_layer(h_t)
        
#         return predictions


In [None]:
class LSTM_custom(nn.Module):
    def __init__(
        self,
        input_size=12,
        hidden_size=64,
        num_layers=1,
        sequence_length=10,
        dropout=0.2,
    ):

        super(LSTM_custom, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # Input gate

        ## Weights for input
        self.b_xi
        self.W_xi
        ## Weights for hidden state
        self.W_hi
        self.b_hi

        # Forget gate
        ## Weights for input
        self.W_xf
        self.b_xf
        ## Weights for hidden state
        self.W_hf
        self.b_hf

        # Cell gate
        ## Weights for input
        self.W_xg
        self.b_xg
        ## Weights for hidden state
        self.W_hg
        self.b_hg

        # Output gate
        ## Weights for input
        self.W_xo
        self.b_xo
        ## Weights for hidden state
        self.W_ho
        self.b_ho

    def input_gate(self, x_t, h_prev):
        return torch.sigmoid(
            (self.W_xi @ x_t + self.b_xi) + (self.W_hi @ h_prev + self.b_hi)
        )

    def forget_gate(self, x_t, h_prev):
        return torch.sigmoid(
            (self.W_xf @ x_t + self.b_xf) + (self.W_hf @ h_prev + self.b_hf)
        )

    def cell_gate(self, x_t, h_prev):
        return torch.tanh(
            (self.W_xg @ x_t + self.b_xg) + (self.W_hg @ h_prev + self.b_hg)
        )

    def output_gate(self, x_t, h_prev):
        return torch.sigmoid(
            (self.W_xo @ x_t + self.b_xo) + (self.W_ho @ h_prev + self.b_ho)
        )

    def lstm_cell(self, x_t, h_prev, c_prev):
        i_t = self.input_gate(x_t, h_prev)
        f_t = self.forget_gate(x_t, h_prev)
        g_t = self.cell_gate(x_t, h_prev)
        o_t = self.output_gate(x_t, h_prev)

        c_t = f_t * c_prev + i_t * g_t
        h_t = o_t * torch.tanh(c_t)

        return h_t, c_t

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.hidden_size, device=x.device)
        c_t = torch.zeros(batch_size, self.hidden_size, device=x.device)

        # Process the input sequence once
        for t in range(seq_len):
            h_t, c_t = self.lstm_cell(x[:, t, :], h_t, c_t)

        # Predict next 5 timesteps
        predictions = []
        current_x = x[:, -1, :]  # Start with the last input

        for i in range(5):
            h_t, c_t = self.lstm_cell(current_x, h_t, c_t)
            predictions.append(h_t)
            # You would need a projection layer to convert h_t to the right dimension
            # current_x = projection_layer(h_t)

        return predictions

In [None]:
def rmse_loss(y_pred, y_true):
    return torch.sqrt(torch.mean((y_pred - y_true) ** 2))

In [None]:
# Model parameters
input_size = 12  # Number of input features (still use all features as input)
hidden_size = 128
num_layers = 1
model = LSTM_custom(input_size=input_size, hidden_size=hidden_size,
                    num_layers=num_layers).to(device)
# Loss and optimizer

criterion = rmse_loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    for i, (inputs, targets) in enumerate(train_loader):
        inputs = inputs.to(device)
        targets = targets.to(device)
        print(inputs.shape)
        print(targets.shape)
        break
        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs[-1], targets)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")