In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch
import torch.nn as nn

class SteeringAngleCNN(nn.Module):
    """
    CNN model for predicting steering angles from dashcam images.
    Based on NVIDIA's PilotNet architecture with some simplifications.
    """
    def __init__(self):
        super(SteeringAngleCNN, self).__init__()
        
        # Convolutional layers
        self.conv_layers = nn.Sequential(
            # Layer 1: 24 filters of 5x5, stride 2
            nn.Conv2d(3, 24, kernel_size=5, stride=2),
            nn.ReLU(),
            
            # Layer 2: 36 filters of 5x5, stride 2
            nn.Conv2d(24, 36, kernel_size=5, stride=2),
            nn.ReLU(),
            
            # Layer 3: 48 filters of 5x5, stride 2
            nn.Conv2d(36, 48, kernel_size=5, stride=2),
            nn.ReLU(),
            
            # Layer 4: 64 filters of 3x3, stride 1
            nn.Conv2d(48, 64, kernel_size=3, stride=1),
            nn.ReLU(),
            
            # Layer 5: 64 filters of 3x3, stride 1
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(1152, 100),  # 1152 is just a placeholder - will be calculated dynamically
            nn.ReLU(),
            
            nn.Dropout(0.5),
            nn.Linear(100, 50),
            nn.ReLU(),
            
            nn.Linear(50, 10),
            nn.ReLU(),
            
            nn.Linear(10, 1)  # Output: steering angle
        )
        
        # Calculate the flattened size
        self._calculate_conv_output_size()
        
    def _calculate_conv_output_size(self):
        """Calculate the size of the flattened features after convolution layers"""
        # Create a dummy input tensor (batch_size, channels, height, width)
        # Assuming input images will be resized to 66x200
        x = torch.zeros(1, 3, 66, 200)
        x = self.conv_layers(x)
        
        # Get the flattened size
        flattened_size = x.numel() // x.size(0)
        
        # Update the first fully connected layer's input size
        self.fc_layers[1] = nn.Linear(flattened_size, 100)
        
        print(f"Calculated convolutional output size: {x.size()}")
        print(f"Flattened features size: {flattened_size}")
        
    def forward(self, x):
        """Forward pass through the network"""
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_layers(x)
        return x

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np


def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    """
    Train the model and return trained model along with training history
    """
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    print(f"Training on {device}")
    
    # Track losses
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        
        for inputs, angles in train_loader:
            inputs = inputs.to(device)
            angles = angles.to(device).float().view(-1, 1)  # Ensure correct shape
            
            # Zero gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, angles)
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_train_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_train_loss)
        
        # Validation phase
        model.eval()
        running_loss = 0.0
        
        with torch.no_grad():
            for inputs, angles in val_loader:
                inputs = inputs.to(device)
                angles = angles.to(device).float().view(-1, 1)
                
                outputs = model(inputs)
                loss = criterion(outputs, angles)
                
                running_loss += loss.item() * inputs.size(0)
        
        epoch_val_loss = running_loss / len(val_loader.dataset)
        val_losses.append(epoch_val_loss)
        
        # Print progress
        print(f'Epoch {epoch+1}/{num_epochs}: '
              f'Train Loss: {epoch_train_loss:.4f}, '
              f'Val Loss: {epoch_val_loss:.4f}')
    
    # Plot training curves
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.savefig('training_curves.png')
    plt.show()
    
    return model, train_losses, val_losses

In [5]:
def test_model(model, test_loader):
    """
    Test the model and visualize some predictions
    """
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, angles in test_loader:
            inputs = inputs.to(device)
            angles = angles.to(device).float().view(-1, 1)
            
            outputs = model(inputs)
            
            all_preds.extend(outputs.cpu().numpy())
            all_targets.extend(angles.cpu().numpy())
            
            # Break after one batch for visualization
            break
    
    # Visualize a few predictions
    plt.figure(figsize=(15, 10))
    for i in range(min(5, len(all_preds))):
        plt.subplot(1, 5, i+1)
        
        # Get the image
        img = inputs[i].cpu().numpy().transpose(1, 2, 0)
        
        # If normalized, denormalize
        # This assumes images were normalized with ImageNet stats
        mean = [0.485, 0.456, 0.406]
        std = [0.229, 0.224, 0.225]
        img = img * std + mean
        img = np.clip(img, 0, 1)
        
        plt.imshow(img)
        plt.title(f"Pred: {all_preds[i][0]:.2f}\nTrue: {all_targets[i][0]:.2f}")
        plt.axis('off')
    
    plt.savefig('sample_predictions.png')
    plt.show()
    
    # Calculate mean absolute error
    mse = np.mean((np.array(all_preds) - np.array(all_targets))**2)
    mae = np.mean(np.abs(np.array(all_preds) - np.array(all_targets)))
    print(f"Test MSE: {mse:.4f}")
    print(f"Test MAE: {mae:.4f}")