In [None]:
import numpy as np
import pandas as pd

# Load features
camera_features_path = "/path/to/camera/features/folder"
lidar_features_path = "/path/to/lidar/features/folder"
labels_path = "/path/to/labels/excel/file.xlsx"

# Assuming each feature file corresponds to a timestamp
camera_features = [np.load(f"{camera_features_path}/{file}") for file in sorted(os.listdir(camera_features_path))]
lidar_features = [np.load(f"{lidar_features_path}/{file}") for file in sorted(os.listdir(lidar_features_path))]

# Load labels
labels_df = pd.read_excel(labels_path)
steering_angles = labels_df['steering_angle'].values
velocities = labels_df['velocity'].values

# Combine features (camera + LiDAR)
combined_features = [np.concatenate((cam, lidar), axis=1) for cam, lidar in zip(camera_features, lidar_features)]


In [None]:
from sklearn.preprocessing import MinMaxScaler

# Normalize combined features and labels
scaler_features = MinMaxScaler()
scaler_labels = MinMaxScaler()

combined_features = scaler_features.fit_transform(np.array(combined_features).reshape(-1, combined_features[0].shape[1]))
labels = scaler_labels.fit_transform(np.column_stack((steering_angles, velocities)))


In [None]:
def create_sequences(features, labels, seq_length=48):
    X, y = [], []
    for i in range(len(features) - seq_length):
        X.append(features[i:i + seq_length])
        y.append(labels[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 48
X, y = create_sequences(combined_features, labels, seq_length)


In [None]:
train_size = int(len(X) * 0.8)
X_train, X_val = X[:train_size], X[train_size:]
y_train, y_val = y[:train_size], y[train_size:]


In [None]:
# Update output_dim to match the number of predictions (steering angle + velocity)
output_dim = 2  # For steering angle and velocity
model = TimeSeriesTransformer(input_dim=X_train.shape[2], d_model=128, nhead=8, num_layers=4, 
                               output_dim=output_dim, seq_length=X_train.shape[1])


In [None]:
# Training loop
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

from torch.utils.data import DataLoader, TensorDataset

train_dataset = TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train))
val_dataset = TensorDataset(torch.Tensor(X_val), torch.Tensor(y_val))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

epochs = 10  # Adjust based on performance
for epoch in range(epochs):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    val_loss = 0
    model.eval()
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            output = model(X_batch)
            loss = criterion(output, y_batch)
            val_loss += loss.item()

    print(f"Epoch {epoch+1}, Train Loss: {train_loss / len(train_loader)}, Val Loss: {val_loss / len(val_loader)}")


In [None]:
model.eval()
predictions, actuals = [], []

with torch.no_grad():
    for X_batch, y_batch in val_loader:
        output = model(X_batch)
        predictions.append(output)
        actuals.append(y_batch)

predictions = torch.cat(predictions).numpy()
actuals = torch.cat(actuals).numpy()

# Inverse scaling
predictions = scaler_labels.inverse_transform(predictions)
actuals = scaler_labels.inverse_transform(actuals)


In [None]:
import matplotlib.pyplot as plt

# Plot Steering Angle
plt.figure(figsize=(10, 5))
plt.plot(actuals[:100, 0], label='Actual Steering Angle')
plt.plot(predictions[:100, 0], label='Predicted Steering Angle')
plt.legend()
plt.title("Steering Angle Prediction")
plt.show()

# Plot Velocity
plt.figure(figsize=(10, 5))
plt.plot(actuals[:100, 1], label='Actual Velocity')
plt.plot(predictions[:100, 1], label='Predicted Velocity')
plt.legend()
plt.title("Velocity Prediction")
plt.show()
