In [None]:
train_dataset = CarSequenceDataset(df_train, state, control, seq_length, stride=seq_length)
test_dataset = CarSequenceDataset(df_test, state, control, seq_length, stride=seq_length)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)  #dont need to shuffle time series data??
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


#model class to declare RNN and defining a forward pass of the model


class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, seq_length):
        #inherits from nn.Module
        super(RNN, self).__init__()
        self.hidden_size = hidden_size  #dim of memory inside lstm
        self.num_layers = num_layers  #stacked lstm layers
        #lstm: long short term memory - looks at lng term dependencies in sequential data

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)  #correspond to input data shape
        self.seq_length = seq_length  #no of timestamps to look at to predict the next control output

        #num classes is the no of outputs predicted by the model

        #to convert memory vector to outputs (shaping constraints)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        #inital hidden, cell states - these are internal memory vectors
        #hidden = short term memory, current output of LSTM at a given time
        hidden_state = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        #cell state = long term memory, stores trends (remmebers info over many time steps)

        cell_states = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        #forward propagate lstm
        out, _ = self.lstm(x)
        #out, _ = self.lstm(x, (hidden_state,
        #                        cell_states))  #out; tensor of shape(batch_soze, seq_length, hidden_size) - at the final time step
        #decode the hidden state of t
        out = self.fc(out[:, -1, :])
        return out


hidden_size = 64
num_layers = 2

model = RNN(input_size=10, hidden_size=64, num_layers=2, seq_length=10).to(device)

#regression based
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 10
step = 0
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for x_batch, y_batch in train_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        step += 1
        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        print(f"Epoch {epoch + 1}, Step {step}, Loss: {loss.item():.4f}")

    avg_loss = epoch_loss / len(train_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_loss:.4f}")

    # Optional: evaluation on test set
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            test_loss += loss.item()
    avg_test_loss = test_loss / len(test_loader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Test Loss: {avg_test_loss:.4f}")

torch.save(model.state_dict)

In [None]:
df_check = final_df_car2.copy()
df_check[state+control] = scaler_in.transform(df_check[state+control])
df_check[state] = scaler_out.transform(df_check[state]) #%%
check_data = CarSequenceDataset(df_check, state, control, seq_length, stride = seq_length)
loader = DataLoader(train_dataset, batch_size=64, shuffle = False) model.eval() # switch to evaluation mode
check_x, check_y = next(iter(loader)) check_x = check_x.to(device) check_y = check_y.to(device)
with torch.no_grad():
    output_check = model(check_x)
    out_np = output_check.cpu().numpy()
    y_np = check_y.cpu().numpy() #Inverse scaling
    out_pred = scaler_out.inverse_transform(out_np)
    y_actual = scaler_out.inverse_transform(y_np) #%%

import matplotlib.pyplot as plt
plt.figure(figsize=(10,4))
for i in range(0,6):
    plt.plot(y_actual[:,i], label="True Speed")
    plt.plot(out_pred[:,i], label="Predicted Speed", alpha=0.7)
    plt.xlabel("Sequence Index")
    plt.ylabel("Speed (units)")
    plt.legend()
    plt.title("Predicted vs True Speed")
    plt.show() #%%