In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.optim as optim
import PIL.Image as Image

In [2]:
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.data = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = self.data.iloc[idx, 0]
        image = Image.open(img_path)
        steering = float(self.data.iloc[idx, 3])
        throttle = float(self.data.iloc[idx, 4])

        if self.transform:
            image = self.transform(image)

        return image, (steering, throttle)


In [3]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Example normalization
])


In [4]:
dataset = CustomDataset('driving_log.csv', transform=transform)

In [5]:
train_size = int(0.8 * len(dataset))
valid_size = len(dataset) - train_size
train_dataset, valid_dataset = torch.utils.data.random_split(dataset, [train_size, valid_size])


In [6]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

In [7]:
class DLModel(nn.Module):
    def __init__(self):
        super(DLModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 24, kernel_size=5, stride=2)
        self.conv2 = nn.Conv2d(24, 36, kernel_size=5, stride=2)
        self.conv3 = nn.Conv2d(36, 48, kernel_size=5, stride=2)
        self.conv4 = nn.Conv2d(48, 64, kernel_size=3, stride=1)
        self.fc_input_size = self.calculate_fc_input_size()
        self.fc1 = nn.Linear(self.fc_input_size, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, 10)
        self.steering_output = nn.Linear(10, 1)
        self.throttle_output = nn.Linear(10, 1)

    def forward(self, x):
        # Define the forward pass here
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        
        
        # Flatten the tensor based on the calculated input size
        x = x.view(-1, self.fc_input_size)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        
        # Concatenate steering and throttle outputs along a new dimension
        combined_output = torch.cat((self.steering_output(x), self.throttle_output(x)), dim=1)
        
        return combined_output

    
    def calculate_fc_input_size(self):
        # Helper function to calculate the input size for the fully connected layers
        # This function should be called only once after the convolutional layers have been defined
        x = torch.randn(1, 3, 160, 320)  # Create a random input tensor with the same shape as your data
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        
        return x.view(1, -1).size(1)  # Return the flattened size



In [8]:
model = DLModel()

In [9]:
criterion_steer = nn.MSELoss()
criterion_throttle = nn.MSELoss()

In [10]:
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [11]:
num_epochs = 10
train_losses = []
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, (steering_labels, throttle_labels) in train_loader:
        optimizer.zero_grad()
        
        # Set the data type of the input data to float
        images = images.float()
        steering_labels = steering_labels.float()
        throttle_labels = throttle_labels.float()

        outputs = model(images)
        
        # Split the outputs into steering and throttle predictions
        steering_predictions = outputs[:, 0]
        throttle_predictions = outputs[:, 1]

        loss_steer = criterion_steer(steering_predictions, steering_labels)
        loss_throttle = criterion_throttle(throttle_predictions, throttle_labels)
        loss = loss_steer + loss_throttle
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_losses.append(running_loss / len(train_loader))
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(train_loader)}")


Epoch 1/10, Loss: 0.2550914931982275
Epoch 2/10, Loss: 0.020881363761521155
Epoch 3/10, Loss: 0.016972980732398647
Epoch 4/10, Loss: 0.015540739934470865
Epoch 5/10, Loss: 0.014967500352330746
Epoch 6/10, Loss: 0.013883625517689412
Epoch 7/10, Loss: 0.013033673655421983
Epoch 8/10, Loss: 0.012284387143388871
Epoch 9/10, Loss: 0.011404487582284116
Epoch 10/10, Loss: 0.010619177490532879


In [12]:
model.eval()
valid_loss = 0.0
with torch.no_grad():
    for images, (steering_labels, throttle_labels) in valid_loader:
        images = images.float()
        steering_labels = steering_labels.float()
        throttle_labels = throttle_labels.float()

        outputs = model(images)
        loss_steer = criterion_steer(outputs[:, 0], steering_labels)
        loss_throttle = criterion_throttle(outputs[:, 1], throttle_labels)
        loss = loss_steer + loss_throttle
        valid_loss += loss.item()

print(f"Validation Loss: {valid_loss / len(valid_loader)}")


Validation Loss: 0.014046360738575459


In [13]:
torch.save(model, 'PTmodel.pth')