In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Load data
data = pd.read_csv('/path/to/your/data.csv')

# Preprocess data
features = ['v_follow', 'spacing', 'relative_speed']
target = 'x_follow'

scaler = MinMaxScaler()
data[features] = scaler.fit_transform(data[features])

target_scaler = MinMaxScaler()
data[[target]] = target_scaler.fit_transform(data[[target]])

In [None]:
def create_sequences(data, seq_length):
    sequences = []
    targets = []
    for i in range(len(data) - seq_length + 1):
        seq = data.iloc[i:i+seq_length][features].values
        label = data.iloc[i+seq_length-1][target]
        sequences.append(seq)
        targets.append(label)
    return np.array(sequences), np.array(targets)

seq_length = 50

In [None]:
# Build LSTM model
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_layer_size, num_layers, output_size):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        
    def forward(self, input_seq):
        h_0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size)
        c_0 = torch.zeros(self.num_layers, input_seq.size(0), self.hidden_layer_size)
        out, _ = self.lstm(input_seq, (h_0, c_0))
        out = self.linear(out[:, -1, :])
        return out

In [None]:
input_size = 3
hidden_layer_size = 100
num_layers = 2
output_size = 1

model = LSTM(input_size, hidden_layer_size, num_layers, output_size)

loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Train the model on each of the first 2500 vehicle pairs
for pair_id in range(1, 2501):
    train_data = data[data['pair_id'] == pair_id]
    X_train, y_train = create_sequences(train_data, seq_length)
    X_train = torch.tensor(X_train, dtype=torch.float32)
    y_train = torch.tensor(y_train, dtype=torch.float32)
    train_data = torch.utils.data.TensorDataset(X_train, y_train)
    train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True, drop_last=False)
    
    epochs = 150
    for i in range(epochs):
        for seq, labels in train_loader:
            optimizer.zero_grad()
            y_pred = model(seq)
            single_loss = loss_function(y_pred.view(-1), labels)
            single_loss.backward()
            optimizer.step()
    if pair_id % 100 == 0:
        print(f'Pair {pair_id} loss: {single_loss.item():10.8f}')

In [None]:
# Test the model on the remaining 500 vehicle pairs
test_data = data[data['pair_id'] > 2500]
X_test, y_test = create_sequences(test_data, seq_length)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

In [None]:
# Evaluate the model
model.eval()
with torch.no_grad():
    y_pred = model(X_test.view(-1, seq_length, input_size))

# Denormalize the data
y_test_denorm = target_scaler.inverse_transform(y_test.reshape(-1, 1)).reshape(-1)
y_pred_denorm = target_scaler.inverse_transform(y_pred.view(-1, 1)).reshape(-1)

# Plot actual vs predicted values
plt.figure(figsize=(12, 4))
plt.plot(y_test_denorm, label='True')
plt.plot(y_pred_denorm, label='Predicted')
plt.legend()
plt.show()