# Pytorch

## Preprocessing the data

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
from torch.utils.data import Dataset, DataLoader

In [None]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
df = pd.read_csv('cleaned_data/aalborg_data.csv')
df.head()

In [None]:
# Select relevant columns for features and target
features = df[['angle', 'rpm', 'speed_x', 'speed_y', 'track_position']]
targets = df[['acceleration', 'brake', 'steer']]

# Normalize features
scaler = StandardScaler()
features = scaler.fit_transform(features)

# Split the dataset
X_train, X_test, y_train, y_test = train_test_split(features, targets, test_size=0.2, random_state=42)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train.values, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test.values, dtype=torch.float32).to(device)


## Create the model

In [None]:
import torch.nn as nn

class CarControlModel(nn.Module):
    def __init__(self):
        super(CarControlModel, self).__init__()
        self.fc1 = nn.Linear(5, 64)  # 5 input features
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 3)  # 3 output actions

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = CarControlModel().to(device)


In [None]:
import torch.optim as optim
from tqdm.auto import tqdm

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 100
batch_size = 10

# Create DataLoader
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

losses = []
for epoch in tqdm(range(num_epochs)):
    for i, (inputs, targets) in enumerate(train_loader):
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        


        # Backward pass and optimize
        loss.backward()
        optimizer.step()
    losses.append(loss.item())

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


Cost graph

In [None]:
import matplotlib.pyplot as plt

plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()

## Calculate the accuracy

In [None]:
import torch
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def evaluate_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient computation
        predictions = model(X_test).cpu().numpy()  # Make predictions and move to CPU
        targets = y_test.cpu().numpy()  # Move targets to CPU
    
    # Calculate evaluation metrics
    mse = mean_squared_error(targets, predictions)
    mae = mean_absolute_error(targets, predictions)
    r2 = r2_score(targets, predictions)
    
    print(f'Mean Squared Error (MSE): {mse:.4f}')
    print(f'Mean Absolute Error (MAE): {mae:.4f}')
    print(f'R-squared (R²): {r2:.4f}')
    
    return mse, mae, r2

mse, mae, r2 = evaluate_model(model, X_test, y_test)


## Save the model

In [None]:
# Save the model
torch.save(model.state_dict(), 'models/car_control_model.pth')

In [None]:
# load the model
model_loaded = CarControlModel().to(device)
model_loaded.load_state_dict(torch.load('models/car_control_model.pth'))


In [None]:
# Save scaler
import pickle

with open('models/scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
    

## Make predictions

In [None]:
# Make predictions

def predict(model, scaler, X):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Disable gradient computation
        X = torch.tensor(scaler.transform(X), dtype=torch.float32).to(device)
        predictions = model(X).cpu().numpy()  # Make predictions and move to CPU
    return predictions

# Example input from the test set
X_example = X_test[:5]
y_example = y_test[:5]

# Make predictions
predictions = predict(model_loaded, scaler, X_example)
print('Predictions:')
print(predictions)
print('\nTrue values:')
print(y_example)
