In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
import PIL.Image as Image
from PIL import Image

In [2]:
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file)
        self.transform = transform
    
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 0]
        img_path = "IMG/" + img_path.split("\\")[-1]   
        image = Image.open(img_path)
        steering = float(self.data.iloc[idx, 3])
        throttle = float(self.data.iloc[idx, 4])


        if self.transform:
            image = self.transform(image)

        return image, (steering, throttle)

In [3]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) ])

In [4]:
dataset = CustomDataset('driving_log.csv', transform=transform)

In [5]:
train_size = int(0.8 * len(dataset))
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size])


In [6]:
train_size = int(0.8 * len(dataset))
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size])

In [7]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [8]:
class DLModel(nn.Module):
    def __init__(self):
        super(DLModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 24, kernel_size=5, stride=2)
        self.conv2 = nn.Conv2d(24, 36, kernel_size=5, stride=2)
        self.conv3 = nn.Conv2d(36, 48, kernel_size=5, stride=2)
        self.conv4 = nn.Conv2d(48, 64, kernel_size=3)
        self.conv5 = nn.Conv2d(64, 64, kernel_size=3)
        
        # Calculate the input size for the fully connected layers dynamically
        self.fc_input_size = self.calculate_fc_input_size()
        
        self.fc1 = nn.Linear(self.fc_input_size, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, 10)
        self.steering_output = nn.Linear(10, 1)
        self.throttle_output = nn.Linear(10, 1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        
        x = x.view(-1, self.fc_input_size)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        
        steering_output = self.steering_output(x)
        throttle_output = self.throttle_output(x)
        
        return steering_output, throttle_output

    # Function to calculate the input size for the fully connected layers
    def calculate_fc_input_size(self):
        x = torch.randn(1, 3, 160, 320)  
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        return x.view(1, -1).size(1)



In [9]:
model = DLModel()

In [10]:
criterion_steer = nn.MSELoss()
criterion_throttle = nn.MSELoss()

In [11]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [12]:
num_epochs = 15
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, (steering_labels, throttle_labels) in train_loader:
        optimizer.zero_grad()
        images = images.float()
        steering_labels = steering_labels.float()
        throttle_labels = throttle_labels.float()
        outputs = model(images)
        loss_steer = criterion_steer(outputs[0], steering_labels)
        loss_throttle = criterion_throttle(outputs[1], throttle_labels)
        loss = loss_steer + loss_throttle
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")


  return F.mse_loss(input, target, reduction=self.reduction)
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/15, Loss: 0.07391588863045458
Epoch 2/15, Loss: 0.021030050914569366
Epoch 3/15, Loss: 0.02113353375405554
Epoch 4/15, Loss: 0.020544775551365267
Epoch 5/15, Loss: 0.020140140512657743
Epoch 6/15, Loss: 0.021024659441243255
Epoch 7/15, Loss: 0.020058995743672692
Epoch 8/15, Loss: 0.02028159201595812
Epoch 9/15, Loss: 0.020632867205647692
Epoch 10/15, Loss: 0.020511112115796534
Epoch 11/15, Loss: 0.020187460456884677
Epoch 12/15, Loss: 0.020475543074069485
Epoch 13/15, Loss: 0.02083931459234126
Epoch 14/15, Loss: 0.020548553206026554
Epoch 15/15, Loss: 0.020396846019843172


In [13]:
model.eval()
valid_loss = 0.0
with torch.no_grad():
    for images, (steering_labels, throttle_labels) in valid_loader:
        outputs = model(images)
        loss_steer = criterion_steer(outputs[0], steering_labels)
        loss_throttle = criterion_throttle(outputs[1], throttle_labels)
        loss = loss_steer + loss_throttle
        valid_loss += loss.item()

print(f"Validation Loss: {valid_loss / len(valid_loader)}")

Validation Loss: 0.018015888857087742


  return F.mse_loss(input, target, reduction=self.reduction)


In [14]:
torch.save(model.state_dict(), 'PTmodel.pth')
