In [None]:
# Required libraries
libraries = ['numpy', 'torch', 'torchvision', 'opencv-python', 'matplotlib', 'tqdm']

# Function to install missing libraries
def install_missing_libraries(libs):
    import subprocess
    for lib in libs:
        try:
            import lib
        except ImportError:
            print(f"Installing missing library: {lib}")
            subprocess.call(['pip', 'install', lib])


# Check and install missing libraries
install_missing_libraries(libraries)

In [3]:
import numpy as np
import torch.nn.functional as F
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import os
import cv2
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import numpy as np
import random
from random import randint
from tqdm.notebook import tqdm
    

download the dataset from https://www.kaggle.com/asrsaiteja/car-steering-angle-prediction 
Use Api/Direct download for this purpose.

In [4]:
'''dataset function returns an appended list of elements i.e[image_tensor ,angle in radians]
   angles we converted to radians, similar to what nvidia did , this reduces the range of the target values
   Change the path according to your own file system.
   Dataset is not included due to size limitation'''

def dataset():
    folder = '../input/car-steering-angle-prediction/driving_dataset/'
    data_list=[]
    with open("../input/car-steering-angle-prediction/driving_dataset/angles.txt") as angle_file:
        for line in angle_file:
            line_values=line.split()
            image = cv2.imread(folder+line_values[0])
            resize_image = cv2.resize(image, (200,66))
            data_list.append([torch.from_numpy(resize_image.transpose()).float(),float(line_values[1]) * np.pi / 180])
    return data_list  

In [5]:
%%time
print('Loading dataset')
data = dataset()

In [6]:
sample_imag, sample_angle = data[randint(0, len(data))]
plt.title("Sample Image")
plt.imshow(np.transpose(sample_imag.cpu().numpy().astype(int), (2,1,0)))
print("angle:",sample_angle)

In [7]:
#25k images for training 
#10k for validation
#10k for testing
data_size=len(data)
train_data = data[0:data_size-20000]
valid_data = data[data_size-20000:data_size-10000]
test_data = data[data_size-10000:data_size]


In [13]:
#Model
class ConvNet(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.normal = nn.LayerNorm([200,66]) 
        self.conv1 = nn.Conv2d(3, 24, kernel_size=5, stride=2, padding=0)
        self.conv2 = nn.Conv2d(24, 36, kernel_size=5, stride=2, padding=0) 
        self.conv3 = nn.Conv2d(36, 48, kernel_size=5, stride=2, padding=0) 
        self.conv4 = nn.Conv2d(48, 64, kernel_size=3, stride=1, padding=0) 
        self.conv5 = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0) 
        self.lin1 = nn.Linear(1152, 100)
        self.lin2 = nn.Linear(100, 50)
        self.lin3 = nn.Linear(50, 10)
        self.lin4 = nn.Linear(10, 1)


    def forward(self,x):
        x = F.relu(self.conv1(self.normal(x)))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = x.view(x.shape[0],-1)
        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        x = F.relu(self.lin3(x))
        x = self.lin4(x)
        return x


In [18]:

def model_train(train, valid, test, network, optimizer, criterion, epochs, batch):
    #list to store train and val loss over epochs
    loss_list=[]
    for e in tqdm(range(epochs)):
        print("------------Epoch:",e,"------------")
        print("Started")
        print("Training for Epoch:",e)
        #train data loading and training starts
        train_loader = torch.utils.data.DataLoader(train, batch_size=batch, shuffle=True)
        loss_train = 0
        for i,sample_batched in enumerate(train_loader):
            optimizer.zero_grad()
            label = torch.reshape(sample_batched[1], (sample_batched[1].shape[0],1)).type(torch.float32)
            predicted = net(sample_batched[0])
            loss = criterion(predicted,label)
            loss_train += loss.item()
            loss.backward()
            optimizer.step() 
               
        print("Training ended with loss : " ,np.sqrt(loss_train/len(train)))
        print("Validation for Epoch:",e)
        #validation data loading and validation starts
        validation_loader = torch.utils.data.DataLoader(valid, batch_size=batch, shuffle=True)
        loss_sum = 0
        for i,sample_batched in enumerate(validation_loader):
            target = torch.reshape(sample_batched[1], (sample_batched[1].shape[0],1)).type(torch.float32)
            predicted = net(sample_batched[0])
            loss_val = criterion(predicted,target)
            loss_sum += loss_val.item()
        print("Validaiton ended with loss : ",np.sqrt(loss_sum/len(valid)))
        loss_list.append([np.sqrt(loss_train/len(train)),np.sqrt(loss_sum/len(valid))])
    #testing after model trained    
    test_loader = torch.utils.data.DataLoader(test, batch_size=batch, shuffle=True)
    loss_sum_test = 0
    for i,sample_batched in enumerate(test_loader):
        target = torch.reshape(sample_batched[1], (sample_batched[1].shape[0],1)).type(torch.float32)
        predicted = net(sample_batched[0])
        loss_test = criterion(predicted, target)
        loss_sum_test += loss_test.item()

    return loss_list,np.sqrt(loss_sum_test/len(test))

In [19]:
net = ConvNet()
epochs = 10
batch_size = 512
#learning rate=0.001
optimizer = torch.optim.Adam(net.parameters(),lr=0.001)
criterion = torch.nn.MSELoss(reduction='mean')
loss,loss_test = model_train(train_data, valid_data, test_data, net, optimizer, criterion,epochs , batch_size)


In [34]:
val_loss= [ item[1] for item in loss ]
train_loss=[ item[0] for item in loss ]

In [40]:
#Plotting results
plt.plot(np.arange(len(train_loss)), train_loss, label='Training loss')
plt.plot(np.arange(len(val_loss)), val_loss, label='Validation loss')
plt.title('Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
print("\tTest loss : ", loss_test)
