In [None]:
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from torchvision.io import read_image
import os
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

import base64
import socketio
import eventlet
import eventlet.wsgi
from PIL import Image
from flask import Flask
from io import BytesIO

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
class ImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        self.img_labels = pd.read_csv(annotations_file,header=None)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 3:5]
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(label.values.astype(float))
        label = label.to(torch.float32)
        return image, label

In [None]:

train_annotations = r'C:\Users\iitz3bsmd\Desktop\beta_simulator_windows\data\driving_log.csv'
train_img = r'C:\Users\iitz3bsmd\Desktop\beta_simulator_windows\data\IMG'
test_annotations = r'C:\Users\iitz3bsmd\Desktop\beta_simulator_windows\test\driving_log.csv'
test_img = r'C:\Users\iitz3bsmd\Desktop\beta_simulator_windows\test\IMG'

transform = transforms.Compose(
    [transforms.ToPILImage(),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

training_data = ImageDataset(train_annotations, train_img, transform=transform)
validation_data = ImageDataset(test_annotations, test_img, transform=transform)

batch_size = 128
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
validation_dataloader = DataLoader(validation_data, batch_size=batch_size, shuffle=True)


In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        #input = 320*160*3
        self.conv1 = nn.Conv2d(3, 24, 5, stride=2) #158*78*24
        self.conv2 = nn.Conv2d(24, 36, 5, stride=2) #77*37*36
        self.conv3 = nn.Conv2d(36, 48, 5, stride=2) #37*17*48
        self.pool = nn.MaxPool2d(2, 2) #18*8*48
        self.conv4 = nn.Conv2d(48, 64, 3) #16*6*64
        self.conv5 = nn.Conv2d(64, 64, 3) #14*4*64
        self.fc1 = nn.Linear(14*4*64, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50, 10)
        self.fc4 = nn.Linear(10, 2)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
cnn = CNN()

criterion = nn.MSELoss()
optimizer = optim.Adam(cnn.parameters(), lr=0.001)

In [None]:
## training loop

loss_list = list()
for epoch in range(10):
    running_loss = 0.0
    previous_loss = 0
    for i, data in enumerate(train_dataloader, 0):
        inputs, labels = data

        optimizer.zero_grad()

        outputs = cnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss:.3f}')
        loss_list.append(running_loss)
        running_loss = 0.0

print('Finished Training')
plt.plot(loss_list)
plt.show()

In [None]:
## validation loop

with torch.no_grad():
    correct = 0
    total = 0
    for epoch in range(10):
        for images, labels in validation_dataloader:
            outputs = cnn(images)
            total += 1
            loss = criterion(outputs, labels)
            print (loss)
            if float(loss) <= 0.01:
                correct += 1
    
    print('test images: {}'.format(total))

In [None]:
torch.save(cnn, "model.pth")

In [None]:
cnn = torch.load("model.pth")

In [None]:
sio = socketio.Server()
app = Flask(__name__)
model = None

In [None]:
@sio.on('connect')
def connect(sid, environ):
    print("connect ", sid)
    send_control(0, 0)

def send_control(steering_angle, throttle):
    sio.emit(
        "steer",
        data={
            'steering_angle': steering_angle.__str__(),
            'throttle': throttle.__str__()
        },
        skip_sid=True)
    
@sio.on('telemetry')
def telemetry(sid, data):
    if data:
        img_path = data["image"]
        image = Image.open(BytesIO(base64.b64decode(img_path)))
        image = transform(image)
        image = image.unsqueeze(0)
        [steering_angle, throttle] = model(image)[0]
        steering_angle = steering_angle.item()
        throttle = throttle.item()
        send_control(steering_angle, throttle)

    else:
        # NOTE: DON'T EDIT THIS.
        sio.emit('manual', data={}, skip_sid=True)


In [None]:
## simulator autonomous mode

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

model = cnn

app = socketio.Middleware(sio, app)

eventlet.wsgi.server(eventlet.listen(('', 4567)), app)