In [0]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torch.utils.tensorboard as tb


from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter

from torch import save
from torch import load
from os import path
import torchvision.models as models
from torch.utils.data.sampler import SubsetRandomSampler

from torch import save
from torch import load
from os import path

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
# set current directory

%cd /content/drive/My Drive/Steering_Datasets_Rev1
!unzip CH2_002_Test.zip

# DataLoader



In [0]:
class SteeringDataset(Dataset):
    def __init__(self, image_path, data_transforms=None):
        """
        Your code here
        Hint: Use the python csv library to parse labels.csv
        """
        # self.name_label_path = name_label_path
        self.image_path = image_path
        self.name = pd.read_csv(self.image_path + 'interpolated.csv', usecols=range(5,6))

        self.labels = pd.read_csv(self.image_path + 'interpolated.csv', usecols=range(6,7))
        self.center_data = pd.concat([self.name, self.labels], axis=1) #combine image name and label dataframes
        self.center_data = self.center_data[self.center_data["filename"].str.contains('center')] # only keep center image names and labels

        self.name = pd.DataFrame(self.center_data[self.center_data.columns[0]]) # center images names
        self.len = self.name.shape[0]
        self.labels = pd.DataFrame(self.center_data[self.center_data.columns[1]]) # center image labels

        # bin angles into 100 classes

        # classes = 100
        # bins = np.linspace(-2.1,2,classes)
        # labels = np.linspace(1,classes-1,classes-1).astype(int)
        # self.labels = pd.cut(self.labels['angle'], bins = bins, labels = labels)

        print(self.labels)

    def __len__(self):
        """
        Your code here
        """
        return self.len

    def __getitem__(self, idx):
        """
        Your code here
        return a tuple: img, label
        """
        # img = Image.open(str(self.name.iloc[idx, 0]))
        img = Image.open(str(self.name.iloc[idx, 0])[7:]) # [7:] removes 'center/' before the image number since the zipped images just have numbers
        img = img.resize((320,240)) #resize image
        transform = transforms.ToTensor()
        img = transform(img)
        label = self.labels.iloc[idx][0]
        return img, label


# Save and Load Models

In [0]:
def load_data(dataset_path, data_transforms, num_workers, batch_size, shuffle):
    dataset = SteeringDataset(dataset_path, data_transforms)
    return DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, shuffle=shuffle)

def train_test_split(args, dataset):
    # Indices of Split
    size = len(dataset)
    indices = list(range(size))
    train_size = int(args.test_fraction * size)
    train_indices, test_indices = indices[train_size:], indices[:train_size]

    # SubsetRandomSamplers
    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(test_indices)

    # DataLoaders
    train = DataLoader(dataset, batch_size=args.train_batch_size, sampler=train_sampler)
    test = DataLoader(dataset, batch_size=args.test_batch_size, sampler=valid_sampler)

    return train, test

def save_model(model, file_name):
    #if isinstance(model, CNNClassifier):
    return save(model.state_dict(), path.join(path.abspath(''), file_name))

    #raise ValueError("model type '%s' not supported!"%str(type(model)))

def load_model(file_name):
    r = CustomModel()
    r.load_state_dict(load(path.join(path.abspath(''), file_name), map_location='cpu'))
    return r

# Loss

In [0]:
class ClassificationLoss(torch.nn.Module):
    def forward(self, input, target):
        """
        Your code here
        Compute mean(-log(softmax(input)_label))
        @input:  torch.Tensor((B,C)), where B = batch size, C = number of classes
        @target: torch.Tensor((B,), dtype=torch.int64)
        @return:  torch.Tensor((,))
        Hint: Don't be too fancy, this is a one-liner
        """
        m = nn.MSELoss()
        return torch.sqrt(m(input.view(input.size(0)),target))

# Model

In [0]:
class CNNClassifier(torch.nn.Module):
    def __init__(self):
        """
        Your code here
        """
        super(CNNClassifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 5, kernel_size=3)
        self.conv2 = nn.Conv2d(5, 15, kernel_size=3)
        self.fc1 = nn.Linear(67860, 100)
        self.fc2 = nn.Linear(100, 50)
        self.fc3 = nn.Linear(50,1)


    def forward(self, x):
        """
        Your code here
        @x: torch.Tensor((B,3,64,64))
        @return: torch.Tensor((B,6))
        """
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, kernel_size=2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, kernel_size=2)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

# Parameters

In [0]:
# Parameters

class Args(object):
    def __init__(self, model):

        # load_data
        self.data_path = ''
        self.transforms = None  # not in use
        self.num_workers = 0  # not in use
        self.train_batch_size = 200
        self.test_batch_size = 1  # b/c batches aren't needed for testing
        self.shuffle = False  # not in use

        #train_test_split
        self.test_fraction = 0.2

        # Device
        self.device = torch.device("cuda")

        # train
        self.learning_rate = 0.01 
        self.momentum = 0.9 # not in use 
        self.epochs = 10
        self.optimizer = torch.optim.Adam(model.parameters(), lr=self.learning_rate)
        self.criterion = ClassificationLoss() #nn.CrossEntropyLoss() #RMSELoss().   CrossEntropyLoss: lable.long() conversion needed in train()

        # test
        self.correct_threshold = 0.1  # compared with prediction loss


# Train

In [0]:
def train(args,model,train_data, test_data):
    """
    Your code here
    """
    
    # initialize weights

    for m in model.modules():
        if isinstance(m, torch.nn.Linear):
            torch.nn.init.normal_(m.weight, mean=0, std=0.1)
            torch.nn.init.constant_(m.bias, 0.1)
        if isinstance(m, torch.nn.Conv2d):
            torch.nn.init.normal_(m.weight, mean=0, std=0.1)


    # Model
    model.train()
    model.to(args.device)
    
    # Optimizer & Criterion
    optimizer = args.optimizer
    criterion = args.criterion
    criterion.to(args.device)


    # Book Keeping
    train_losses = []
    train_counter = []
    num_steps_per_epoch = len([i for i in range(len(train_data)) if i%10==0])


    # Epoch Loop
    for epoch in range(1, args.epochs+1):
        stepper = 0
        model.train()
        
        # Batch Loop
        
        
        for batch_idx, (data, label) in enumerate(train_data):
            
            print('epoch: ', epoch, '.  batch_idx: ', batch_idx)


            data, label = data.to(args.device), label.to(args.device)



            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, label)

            

            loss.backward()
            optimizer.step()

            # Printing & Logging
            if batch_idx % 1 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                  epoch, batch_idx * len(data), len(train_data.dataset),
                  100. * batch_idx / len(train_data), loss.item()))                
                train_losses.append(loss.item())
                train_counter.append((batch_idx*64) + ((epoch-1)*len(train_data.dataset)))
                stepper += 1

        # check validation loss every epoch
        
        valid_loss = test(args, model, test_data)
        print("validation loss = ", valid_loss)

    save_model(model,'cnn_akhil')

In [0]:
def test(args, model, test_data):

    # Model
    model.eval()
    model = model.to(args.device)

    # Loss
    criterion = args.criterion
    criterion.to(args.device)

    # Book Keeping
    test_loss = 0
    correct = 0  # Since no classes: correct++ occurs when prediction is sufficiently close to target (within a threshold)
    test_progress = 0

    # Data Loop
    with torch.no_grad():
        for data, target in test_data:

            # Use Model
            data, target = data.to(args.device), target.to(args.device)
            output = model(data)
            loss = criterion(output, target).item()
            test_loss += loss

        test_loss /= len(test_data)

    return test_loss

In [0]:
# Model

model = CNNClassifier()

# Parameters
args = Args(model)

# Load Data & Train/Test Split
print('Loading Data...')
train_data, test_data = train_test_split(args, SteeringDataset(args.data_path, args.transforms))

print('Training...')
train(args, model, train_data, test_data)

# Test Model
test(args, model, test_data) 

