# Environment

---



## Prepare the data

---



In [None]:
from google.colab import files
files.upload()

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d zaynena/selfdriving-car-simulator

In [None]:
!unzip selfdriving-car-simulator.zip -d data

## Prepare the libraries

---



In [None]:
import os
import random
import warnings
from typing import Tuple
from tqdm import tqdm

import cv2
import ntpath
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import timm

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import albumentations as A
from albumentations.pytorch import ToTensorV2

import torchvision
import torchvision.transforms as transforms
import torchvision.models as models

# Suppress warnings
warnings.filterwarnings("ignore")


# Data understanding

---

### üì¶ **About Dataset**

#### **Context**
This is an image dataset generated by the **Udacity Self-Driving Car Simulator**.  
The dataset contains driving images from cameras mounted on a virtual vehicle and corresponding driving parameters such as steering angle, throttle, brake, and speed.

---

### üìÅ **Dataset Structure**

| Folder | Description | Size
|--------|-------------|------
| **track1data/** | Contains images and CSV log file collected only from Track 1 | 31,845
| **track2data/** | Contains images and CSV log file collected only from Track 2 | 65,484
| **dataset/** | Contains all images from both **track1data/** and **track2data/** combined | 97,329

---

### üìÑ **CSV File Columns**

| Column | Meaning |
|--------|---------|
| **Center** | Center camera image path |
| **Left** | Left camera image path |
| **Right** | Right camera image path |
| **Steering** | Steering wheel angle |
| **Throttle** | Throttle value (acceleration) |
| **Brake** | Brake value |
| **Speed** | Vehicle speed |

---

|index|Steering|Throttle|Brake|Speed|
|---|---|---|---|---|
|count|21828\.0|21828\.0|21828\.0|21828\.0|
|mean|-0\.01318031883818948|0\.20500802925959638|0\.028569632292197635|8\.782329734606764|
|std|0\.46189122305571556|0\.28688580517629336|0\.1198930944174388|2\.7422589671476296|
|min|-1\.0|0\.0|0\.0|1\.835208e-05|
|25%|-0\.1|0\.0|0\.0|7\.1821675|
|50%|0\.0|0\.0|0\.0|8\.93185|
|75%|0\.05|0\.3538586|0\.0|10\.4863525|
|max|1\.0|1\.0|1\.0|19\.29511|

---

| Steering Value      | Direction     |
|---------------------|---------------|
| Less than 0 (`< 0`) | Turn **Right**|
| Equal to 0 (`= 0`) | Go **Straight**|
| Greater than 0 (`> 0`) | Turn **Left** |


# Data Processing

---



## Data loading

---



In [None]:
# Path to driving log file
CSV_PATH = "/content/data/dataset/dataset/driving_log.csv"


In [None]:
# Driving log columns
COLUMNS = ['Center', 'Left', 'Right', 'Steering', 'Throttle', 'Brake', 'Speed']

# Load dataset
data = pd.read_csv(CSV_PATH, names=COLUMNS)

# Preview data
data.sample(100)


In [None]:
# Show basic information about the dataset
data.info()

In [None]:
# Show basic statistical summary of the data
data.describe()

In [None]:
# Normalize image paths (convert Windows slashes to Unix)
def normalize_path(p: str) -> str:
    return p.replace("\\", "/")

# Apply normalization
for col in ["Center", "Left", "Right"]:
    data[col] = data[col].apply(normalize_path)


In [None]:
def get_filename(path: str) -> str:
    """Return filename from full path."""
    return ntpath.basename(path)

# Extract filenames
for col in ["Center", "Left", "Right"]:
    data[col] = data[col].apply(get_filename)

# Preview data
data.sample(100)

## Data balancing

---



In [None]:
# Steering distribution parameters
NUM_BINS = 25
SAMPLES_PER_BIN = 1600  # Hyperparameter

# Steering histogram
hist, bins = np.histogram(data["Steering"], bins=NUM_BINS)
print("Bin edges:", bins)


In [None]:
# Bin centers
centers = (bins[:-1] + bins[1:]) / 2

# Plot steering distribution
plt.bar(centers, hist, width=0.05, edgecolor='k')
plt.plot(
    [data["Steering"].min(), data["Steering"].max()],
    [SAMPLES_PER_BIN, SAMPLES_PER_BIN],
    "r--",
    linewidth=2,
    label=f"Target per bin ({SAMPLES_PER_BIN})"
)

plt.title("Steering Angle Distribution")
plt.xlabel("Steering Angle")
plt.ylabel("Sample Count")
plt.legend()
plt.show()

In [None]:
print(f"Total samples: {len(data)}")

In [None]:
remove_list = []
steering_vals = data["Steering"].values

for i in range(NUM_BINS):
    if i < NUM_BINS - 1:
        mask = (steering_vals >= bins[i]) & (steering_vals < bins[i + 1])
    else:
        mask = (steering_vals >= bins[i]) & (steering_vals <= bins[i + 1])

    indices = np.where(mask)[0]
    np.random.shuffle(indices)
    remove_list.extend(indices[SAMPLES_PER_BIN:])


In [None]:
# Balance dataset by dropping excess samples
data_balanced = data.drop(remove_list)
print(f"Samples retained: {len(data_balanced)}")

In [None]:
plt.figure(figsize=(12, 6))

# Original distribution
plt.subplot(1, 2, 1)
plt.hist(data["Steering"], bins=NUM_BINS, edgecolor="k")
plt.title("Original Steering Distribution")
plt.xlabel("Steering Angle")
plt.ylabel("Sample Count")

# Balanced distribution
plt.subplot(1, 2, 2)
plt.hist(data_balanced["Steering"], bins=NUM_BINS, edgecolor="k")
plt.title("Balanced Steering Distribution")
plt.xlabel("Steering Angle")
plt.ylabel("Sample Count")

plt.tight_layout()
plt.show()

## Data splitting

---



In [None]:
DATA_DIR = '/content/data/dataset/dataset/IMG'

In [None]:
def load_images_and_steering(data_dir: str, df: pd.DataFrame) -> tuple[np.ndarray, np.ndarray]:
    """Load image paths and steering angles with side camera corrections."""
    image_paths, steerings = [], []

    correction = 0.15
    for _, row in df.iterrows():
        center, left, right = row['Center'], row['Left'], row['Right']
        steer = float(row['Steering'])

        image_paths.extend([
            os.path.join(data_dir, center),
            os.path.join(data_dir, left),
            os.path.join(data_dir, right),
        ])
        steerings.extend([
            steer,
            steer  + correction,
            steer - correction,
        ])

    return np.array(image_paths), np.array(steerings)

# Usage
image_paths, steerings = load_images_and_steering(DATA_DIR + '/', data_balanced)

In [None]:
# length
print(f"Total images: {len(image_paths)}")

In [None]:
def fix_path(path: str) -> str:
    return path.replace("\\", "/")

for col in ["Center", "Left", "Right"]:
    data[col] = data[col].apply(fix_path)

In [None]:
# Preview data
data.sample(100)

In [None]:
sample_idx = 10
start_idx = sample_idx * 3

plt.figure(figsize=(18, 6))

camera_names = ['Center', 'Left', 'Right']

for i in range(3):
    img_path = image_paths[start_idx + i]
    steering_angle = steerings[start_idx + i]

    img = Image.open(img_path)

    plt.subplot(1, 3, i + 1)
    plt.imshow(img)
    plt.title(f'{camera_names[i]} Camera\nSteering: {steering_angle:.2f}')
    plt.axis('off')

plt.tight_layout()
plt.show()


In [None]:
# Split data: 75% train, 15% validation
# Split train: 75% train, 15% test
X_train, X_test, y_train, y_test = train_test_split(
    image_paths, steerings, test_size=0.15, random_state=42
)

X_train, X_val, y_train, y_val=train_test_split(
   X_train, y_train, test_size=0.15, random_state=42
)


print(f"Training samples: {len(X_train)}")
print(f"Validation samples: {len(X_val)}")
print(f"Test samples: {len(X_test)}")


In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Training distribution
axes[0].hist(y_train, bins=NUM_BINS, width=0.05, color='blue', edgecolor='black')
axes[0].set(title='Training Dataset', xlabel='Steering Angle', ylabel='Sample Count')

# Validation distribution
axes[1].hist(y_val, bins=NUM_BINS, width=0.05, color='red', edgecolor='black')
axes[1].set(title='Validation Dataset', xlabel='Steering Angle', ylabel='Sample Count')

plt.tight_layout()
plt.show()

## Data augmentation

---



In [None]:
def normalize_nvidia(image, **kwargs):
    return image.astype(np.float32) / 255.0

def get_transforms_list(Nv):
    transforms = [
        A.Crop(x_min=0, y_min=60, x_max=320, y_max=135, p=1.0),
        A.GaussianBlur(blur_limit=3, p=1.0),
    ]

    if Nv:
        transforms.extend([
            A.Lambda(image=lambda img, **kwargs: cv2.cvtColor(img, cv2.COLOR_RGB2YUV), p=1.0),
            A.Resize(height=66, width=200, interpolation=cv2.INTER_AREA, p=1.0),
            A.Lambda(image=normalize_nvidia, p=1.0),
        ])
    else:
        transforms.extend([
            A.Resize(height=64, width=192, interpolation=cv2.INTER_AREA, p=1.0),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, p=1.0),
        ])

    return transforms

def get_train_transforms(Nv):
    aug_list = [
        A.Affine(scale=(1.0, 1.4), mode=cv2.BORDER_REPLICATE, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=(-0.8, 0.2), contrast_limit=0, p=0.5),
    ]

    full_pipeline = aug_list + get_transforms_list(Nv) + [ToTensorV2(p=1.0)]

    return A.Compose(full_pipeline)

def get_val_transforms(Nv):
    full_pipeline = get_transforms_list(Nv) + [ToTensorV2(p=1.0)]

    return A.Compose(full_pipeline)

def get_test_transforms(Nv):
    full_pipeline = get_transforms_list(Nv) + [ToTensorV2(p=1.0)]

    return A.Compose(full_pipeline)



# Dataset & DataLoader

---



In [None]:
class DrivingDataset(Dataset):
    def __init__(self, image_paths, steerings, transform=None, is_train=False) :
        self.image_paths = image_paths
        self.steerings = steerings
        self.transform = transform
        self.is_train = is_train

    def _random_flip(self, img, steering):
        if random.random() < 0.5:
            return cv2.flip(img, 1), -steering
        return img, steering

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        steering = self.steerings[idx]

        img = np.array(Image.open(img_path).convert('RGB'))

        if self.is_train:
            img, steering = self._random_flip(img, steering)

        if self.transform:
            img = self.transform(image=img)['image']

        return img, torch.tensor(steering, dtype=torch.float32)


In [None]:
# Create datasets
train_dataset = DrivingDataset(X_train, y_train, transform=get_train_transforms(True), is_train=True)
val_dataset = DrivingDataset(X_val, y_val, transform=get_val_transforms(True))
test_dataset = DrivingDataset(X_test, y_test, transform=get_val_transforms(True))

print(f"Training dataset: {len(train_dataset)} samples (with augmentation)")
print(f"Validation dataset: {len(val_dataset)} samples (no augmentation)")

batch_size = 64
print(f"Batch size: {batch_size}")

# Create data loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4,
    pin_memory=True,
)
print(f"Train loader: {len(train_loader)} batches per epoch")

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4,
    pin_memory=True,
)
test_loader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4,
    pin_memory=True,
)
print(f"Validation loader: {len(val_loader)} batches per epoch")

In [None]:
num_batches = 3
images_per_batch = 20
cols = 5
rows = (images_per_batch + cols - 1) // cols

for batch_idx, (images, steerings) in enumerate(train_loader):
    if batch_idx >= num_batches:
        break

    batch_size = images.size(0)
    n_show = min(images_per_batch, batch_size)

    fig, axs = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
    fig.suptitle(f'Batch {batch_idx + 1}', fontsize=16)
    fig.tight_layout(pad=3.0)

    for i in range(n_show):
        r, c = divmod(i, cols)
        img_yuv = images[i].permute(1, 2, 0).cpu().numpy()
        img_rgb = cv2.cvtColor((img_yuv * 255).astype(np.uint8), cv2.COLOR_YUV2RGB)

        axs[r, c].imshow(img_rgb)
        axs[r, c].set_title(f'Steering: {steerings[i].item():.3f}')
        axs[r, c].axis('off')

    # Remove unused subplots
    for j in range(n_show, rows * cols):
        r, c = divmod(j, cols)
        fig.delaxes(axs[r, c])

    plt.show()


# Modeling

---



## Nvidia model

---



### Model architecture (Nvidia)

---



In [None]:
class NvidiaModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 24, kernel_size=5, stride=2),
            nn.ELU(),
            nn.Conv2d(24, 36, kernel_size=5, stride=2),
            nn.ELU(),
            nn.Conv2d(36, 48, kernel_size=5, stride=2),
            nn.ELU(),
            nn.Conv2d(48, 64, kernel_size=3, stride=1),
            nn.ELU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ELU(),
            nn.Dropout(0.5)
        )

        self.fc_layers = nn.Sequential(
            nn.Linear(64 * 1 * 18, 100),
            nn.ELU(),
            nn.Linear(100, 50),
            nn.ELU(),
            nn.Linear(50, 10),
            nn.ELU(),
            nn.Linear(10, 1)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        return self.fc_layers(x)

print("NvidiaModel class defined.")

### Model train (Nvidia)

---



In [None]:
model = NvidiaModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(f"Model ready on {device}. Loss: MSE. Optimizer: Adam.")


In [None]:
def train(model, loader, criterion, optimizer, device, epoch):
    model.train()
    running_loss, running_mae = 0.0, 0.0
    pbar = tqdm(enumerate(loader), total=len(loader), desc=f"Epoch {epoch} Training", ncols=120)

    for i, (images, steerings) in pbar:
        images, steerings = images.to(device), steerings.to(device)
        optimizer.zero_grad()
        outputs = model(images).squeeze()
        loss = criterion(outputs, steerings)
        loss.backward()
        optimizer.step()

        mae = F.l1_loss(outputs, steerings, reduction='mean').item()

        running_loss += loss.item()
        running_mae += mae

        pbar.set_postfix(loss=f"{running_loss/(i+1):.4f}", MAE=f"{running_mae/(i+1):.4f}")

    return running_loss / len(loader), running_mae / len(loader)

def validate(model, loader, criterion, device, epoch):
    model.eval()
    running_loss, running_mae = 0.0, 0.0
    pbar = tqdm(enumerate(loader), total=len(loader), desc=f"Epoch {epoch} Validation", ncols=120)

    with torch.no_grad():
        for i, (images, steerings) in pbar:
            images, steerings = images.to(device), steerings.to(device)
            outputs = model(images).squeeze()
            loss = criterion(outputs, steerings)
            mae = F.l1_loss(outputs, steerings, reduction='mean').item()

            running_loss += loss.item()
            running_mae += mae

            pbar.set_postfix(loss=f"{running_loss/(i+1):.4f}", MAE=f"{running_mae/(i+1):.4f}")

    return running_loss / len(loader), running_mae / len(loader)


In [None]:
best_val_loss = float('inf')
num_epochs = 25

for epoch in range(num_epochs):
    train_loss, train_mae = train(model, train_loader, criterion, optimizer, device, epoch)
    val_loss, val_mae = validate(model, val_loader, criterion, device, epoch)

    print(f"Epoch {epoch+1}/{num_epochs} | "
          f"Train: Loss={train_loss:.4f}, MAE={train_mae:.4f} | "
          f"Val: Loss={val_loss:.4f}, MAE={val_mae:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "nvidia_model.pth")
        print(f"Saved best model at epoch {epoch+1} with val loss: {val_loss:.4f}")

print("Training complete.")


In [None]:
import torch
from tqdm import tqdm

model = NvidiaModel()
model.load_state_dict(torch.load("nvidia_model.pth", map_location="cpu"))

print("Loaded model.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()

test_loss = 0.0
test_mae = 0.0
total_samples = 0

criterion = torch.nn.MSELoss()

with torch.no_grad():
    loop_test = tqdm(test_loader, desc="[Testing]", leave=True)

    for imgs, y in loop_test:
        imgs = imgs.to(device)
        y = y.to(device)

        preds = model(imgs).squeeze()
        loss = criterion(preds, y)

        test_loss += loss.item() * imgs.size(0)
        test_mae += torch.abs(preds - y).sum().item()
        total_samples += imgs.size(0)

final_test_loss = test_loss / total_samples
final_test_mae = test_mae / total_samples

print("=" * 60)
print("FINAL TEST RESULTS")
print(f"Total Samples: {total_samples}")
print(f"Final MSE Loss: {final_test_loss:.4f}")
print(f"Final MAE: {final_test_mae:.4f}")
print("=" * 60)


## ViT Model

---



In [None]:
train_dataset = DrivingDataset(X_train, y_train, transform=get_train_transforms(False), is_train=True)
val_dataset   = DrivingDataset(X_val,   y_val, transform=get_val_transforms(False))
test_dataset   = DrivingDataset( X_test, y_test, transform=get_test_transforms(False))

#----

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
val_loader   = DataLoader(val_dataset,   batch_size=32, shuffle=False, num_workers=2, pin_memory=True)
test_loader   = DataLoader(test_dataset,   batch_size=32, shuffle=False, num_workers=2, pin_memory=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class ViTRegression(nn.Module):
    def __init__(self,
                 model_name='vit_tiny_patch16_224',
                 pretrained=True,
                 drop_rate=0.1,
                 img_size=(64, 192)):
        super().__init__()

        self.backbone = timm.create_model(
            model_name,
            pretrained=pretrained,
            num_classes=0,
            global_pool='avg',
            img_size=img_size,
            dynamic_img_size=True
        )

        features = self.backbone.num_features

        self.head = nn.Sequential(
            nn.Linear(features, 256),
            nn.GELU(),
            nn.Dropout(drop_rate),
            nn.Linear(256, 64),
            nn.GELU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        feats = self.backbone(x)
        out = self.head(feats)
        return out.squeeze(1)


In [None]:
model =ViTRegression().to(device)
for param in model.backbone.parameters(): # "freeze"
    param.requires_grad = False


In [None]:
criterion = nn.MSELoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4, weight_decay=1e-5)


In [None]:
import torch
import torch.optim as optim
from tqdm import tqdm
from sklearn.metrics import r2_score

best_val_loss = float('inf')
num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    running_loss, running_mae = 0.0, 0.0
    train_preds, train_targets = [], []

    loop = tqdm(train_loader, desc=f"Epoch {epoch+1} [Train]", leave=False)

    for imgs, y in loop:
        imgs, y = imgs.to(device), y.to(device)

        optimizer.zero_grad()
        preds = model(imgs).squeeze()
        loss = criterion(preds, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)
        running_mae += torch.abs(preds - y).sum().item()

        train_preds.extend(preds.detach().cpu().numpy())
        train_targets.extend(y.detach().cpu().numpy())

    train_loss = running_loss / len(train_loader.dataset)
    train_mae = running_mae / len(train_loader.dataset)

    model.eval()
    running_loss, running_mae = 0.0, 0.0
    val_preds, val_targets = [], []

    with torch.no_grad():
        loop_val = tqdm(val_loader, desc=f"Epoch {epoch+1} [Val]", leave=False)
        for imgs, y in loop_val:
            imgs, y = imgs.to(device), y.to(device)
            preds = model(imgs).squeeze()
            loss = criterion(preds, y)

            running_loss += loss.item() * imgs.size(0)
            running_mae += torch.abs(preds - y).sum().item()

            val_preds.extend(preds.cpu().numpy())
            val_targets.extend(y.cpu().numpy())

    val_loss = running_loss / len(val_loader.dataset)
    val_mae = running_mae / len(val_loader.dataset)

    print(f"Epoch {epoch+1}/{num_epochs} | "
          f"Train: Loss={train_loss:.4f}, MAE={train_mae:.4f}| "
          f"Val: Loss={val_loss:.4f}, MAE={val_mae:.4f}")


    if epoch == 3:
        for param in model.backbone.parameters():
            param.requires_grad = True
        optimizer = optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-6)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "vit_best_steering.pth")
        print(f"Saved best model (Loss: {val_loss:.4f})")

print("Training Done.")

In [None]:
try:
    model.load_state_dict(torch.load("vit_best_steering.pth"))
    print("‚úÖ Loaded best model weights successfully.")
except FileNotFoundError:
    print("‚ö†Ô∏è Error: vit_best_steering.pth not found. Ensure training completed and saved the file.")
    exit()

model.eval()
test_loss = 0.0
test_mae = 0.0
total_samples = 0

print("\n‚ñ∂Ô∏è Starting Test Set Evaluation...")
with torch.no_grad():
    loop_test = tqdm(test_loader, desc="[Testing]", leave=True)
    for imgs, y in loop_test:
        imgs, y = imgs.to(device), y.to(device)

        preds = model(imgs).squeeze()
        loss = criterion(preds, y)

        test_loss += loss.item() * imgs.size(0)
        test_mae += torch.abs(preds - y).sum().item()
        total_samples += imgs.size(0)

final_test_loss = test_loss / total_samples
final_test_mae = test_mae / total_samples

print("=" * 50)
print("üèÅ Final Test Results")
print(f"| Test Samples: {total_samples}")
print(f"| Final Test MSE Loss: {final_test_loss:.4f}")
print(f"| Final Test MAE: {final_test_mae:.4f}")
print("=" * 50)