In [None]:
import os
import math
import torch
import torchvision
import cv2
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from torchsummary import summary
from PIL import Image
from tqdm import tqdm

##Setting the device

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
architecture = [[
#tuple : (kernel_size, n_filters, stride)
(5,24,2),
(5,36,2),
(5,48,2),
(3,64,1),
(3,64,1)],
[
# int : n_neurons
1152,
 100,
  50,
  10,
   1
]]

In [None]:
#torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')
class ATan(nn.Module):
  def __init__(self):
    super(Atan, self).__init__()

  def forward(self, x):
    return torch.atan(x)

class DriveNet(nn.Module):
  def __init__(self, architecture):
    super(DriveNet, self).__init__()
    self.net = self._create_layers(architecture)

  def forward(self, x):
    return self.net(x)

  def _create_layers(self, architecture, in_channels=3):
    layers = []
    #architecture should have two parts. First part should contain conv layers and second part should contain linear layers.
    for x in architecture[0]:
      if type(x) == tuple:
        layers.append(nn.Conv2d(in_channels, out_channels=x[1], kernel_size=x[0], stride=x[2], padding=0))
        nn.init.trunc_normal_(layers[-1].weight,std=0.1)
        layers.append(nn.ReLU())
        in_channels = x[1]
    
    layers.append(nn.Flatten())
    in_features = architecture[1][0]
    for x in architecture[1][1:]:
      layers.append(nn.Linear(in_features, x))
      nn.init.trunc_normal_(layers[-1].weight,std=0.1)
      layers.append(nn.ReLU())
      in_features = x
    layers = layers[:-1]
    layers.append(Atan())

    return nn.Sequential(*layers)

##Custom Dataset

In [None]:
class Dataset(torch.utils.data.Dataset):
  def __init__(self, data_dir, txt, set_type):
    self.data_dir = data_dir
    self.data = []
    with open(os.path.join(self.data_dir, txt)) as file:
      text = file.read().strip()
      for line in text.split('\n'):
        x, y = line.split()
        self.data.append([x, float(y)])
    if set_type=='train':
      self.data = self.data[:int(0.8*len(self.data))]
    elif set_type=='test':
      self.data = self.data[int(0.8*len(self.data)):]
      
  def __len__(self):
    return len(self.data)
      
  def __getitem__(self, idx):
    img_pth = os.path.join(self.data_dir, self.data[idx][0])
    image = Image.open(img_pth)
    image = image[:,150:,:]
    image = torchvision.transforms.Resize((66,200))(image)
    image = image/255.0
    angle = torch.tensor([self.data[idx][1]*math.pi/180], dtype=torch.float32)

    return self.data[idx][0], image, angle

In [None]:
def save_chkpt(model, optim, filename='drive/MyDrive/DriveCNN.pth.tar'):
  chkpt = {'model':model.state_dict(),'optim':optim.state_dict()}
  torch.save(chkpt, filename)

def load_chkpt(model, optim=None, filename='drive/MyDrive/DriveCNN.pth.tar'):
  chkpt = torch.load(filename)
  model.load_state_dict(chkpt['model'])
  if optim!=None:
    optim.load_state_dict(chkpt['optim'])

In [None]:
epoch_losses = []
def train(data_dir, txt, model, optim, loss_fn, epochs):
  train_set = Dataset(data_dir,txt,'train')
  train_loader = torch.utils.data.DataLoader(train_set, 64, num_workers=16)
  for epoch in range(epochs):
    batch_losses = []
    if epoch%1==0 and epoch!=0:
      save_chkpt(model, optim)
    loop = tqdm(train_loader,  position=0, leave=True)
    for filename, x, y in loop:
      x, y = x.to(device), y.to(device)
      y_hat = model(x)
      loss = loss_fn(y_hat, y)
      batch_losses.append(loss.item())
      optim.zero_grad()
      loss.backward()
      optim.step()
      loop.set_postfix(loss=loss)

    epoch_losses.append(sum(batch_losses)/len(batch_losses))
  return

In [None]:
load_model = False
model = DriveNet(architecture).to(device)
loss = nn.MSELoss().to(device)
optim = torch.optim.Adam(model.parameters(),lr=1e-4,weight_decay=0.001)

if load_model:
  load_chkpt(model)

In [None]:
summary(model,(3,66,200))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 24, 31, 98]           1,824
              ReLU-2           [-1, 24, 31, 98]               0
            Conv2d-3           [-1, 36, 14, 47]          21,636
              ReLU-4           [-1, 36, 14, 47]               0
            Conv2d-5            [-1, 48, 5, 22]          43,248
              ReLU-6            [-1, 48, 5, 22]               0
            Conv2d-7            [-1, 64, 3, 20]          27,712
              ReLU-8            [-1, 64, 3, 20]               0
            Conv2d-9            [-1, 64, 1, 18]          36,928
             ReLU-10            [-1, 64, 1, 18]               0
          Flatten-11                 [-1, 1152]               0
           Linear-12                  [-1, 100]         115,300
             ReLU-13                  [-1, 100]               0
           Linear-14                   

In [None]:
train('drive/MyDrive/driving_dataset','data.txt',model,optim,loss,10)