see:
https://github.com/pytorch/vision/blob/main/torchvision/models/mobilenetv2.py and 
https://pytorch.org/hub/pytorch_vision_mobilenet_v2/

Datasets

- data1... shitty run. The test set is a random subset
- data2... less shitty run. Test-Data is the last frames

In [None]:
import torch
import torchvision
import numpy as np
from glob import glob
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from os import path
from PIL import Image
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights

mobilenet = mobilenet_v3_small(weights=MobileNet_V3_Small_Weights)
embedding_model = torch.nn.Sequential(*list(mobilenet.children()))[:-1]
embedding_model.eval()

classifier = torch.nn.Sequential(
    torch.nn.Linear(in_features=576, out_features=1024, bias=True),
    torch.nn.Hardswish(),
    torch.nn.Dropout(p=0.2, inplace=True),
    torch.nn.Linear(in_features=1024, out_features=3, bias=True)
)


In [None]:
class SteerDataSet(Dataset):
    
    def __init__(self,root_folder,img_ext = ".jpg" , transform=None):
        self.root_folder = root_folder
        self.transform = transform        
        self.img_ext = img_ext        
        self.filenames = glob(path.join(self.root_folder,"*" + self.img_ext))            
        self.totensor = transforms.ToTensor()
        
    def __len__(self):        
        return len(self.filenames)
    
    def __getitem__(self,idx):
        f = self.filenames[idx]        
        img = Image.open(f)
        
        if self.transform == None:
            img = self.totensor(img)
        else:
            img = self.transform(img)   
        
        steering = f.split("/")[-1].split(self.img_ext)[0][6:]
        if '-' in steering:
            steering = steering.split('.')[-1]
        steering = np.float32(steering)

        if steering > 0:
            simple_steering = 0
        elif steering < 0:
            simple_steering = 1
        else:
            simple_steering = 2
    
        sample = {"image":img , "steering":steering, "simple_steering": simple_steering}        
        
        return sample

In [None]:
BATCH_SIZE = 16
TRAIN_DIR = 'data2_train'
TEST_DIR = 'data2_test'

preprocess = transforms.Compose([
    transforms.Resize(256), # TODO downsizing
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

trainset = SteerDataSet(TRAIN_DIR, ".jpg", preprocess)
testset = SteerDataSet(TEST_DIR, ".jpg", preprocess)
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=True)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(classifier.parameters(), lr=0.001, momentum=0.9)


In [None]:
def get_acc(preds, label, bs):
    return sum( [a.item() == b.item()  for (a, b) in zip(pred, label)] ) / bs

In [None]:
PRINT_FREQ = 20

classifier.train()
test_accuracies = []


In [None]:

for epoch in range(10, 20):
    running_loss = 0.0
    running_acc = 0.0
    for i, s in enumerate(trainloader):
        # bring data in right format
        data = s['image']
        label = s['simple_steering']
        # label_onehot = torch.nn.functional.one_hot(label, num_classes = 3)

        # apply embedding model
        with torch.no_grad():
            embeddings = embedding_model(data)
            embeddings2 = torch.nn.functional.adaptive_avg_pool2d(embeddings, (1, 1))
            embeddings3 = torch.flatten(embeddings2, 1)
        
        # backprop classifier
        optimizer.zero_grad()
        output = classifier(embeddings3)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()

        # update train-accuracy
        pred = torch.argmax(output, dim=1)
        this_acc = get_acc(pred, label, BATCH_SIZE)

        running_loss = running_loss + loss.item()
        running_acc = running_acc + this_acc

        if i % PRINT_FREQ == PRINT_FREQ-1:
            print(f"epoch: {epoch},\t item: {i+1},\t loss:{running_loss:.3f},\t acc: {running_acc/PRINT_FREQ:.2f}")
            running_loss = 0.0
            running_acc = 0.0

    # TEST LOOP
    print('test loop')
    running_test_acc = 0.0
    with torch.no_grad():
        for i, s in enumerate(testloader):
            data = s['image']
            label = s['simple_steering']

            # apply the model
            embeddings = embedding_model(data)
            embeddings2 = torch.nn.functional.adaptive_avg_pool2d(embeddings, (1, 1))
            embeddings3 = torch.flatten(embeddings2, 1)            
            output = classifier(embeddings3)
            pred = torch.argmax(output, dim=1)
            test_acc = get_acc(pred, label, BATCH_SIZE)
            running_test_acc = test_acc + running_test_acc

        print(f"test acc: {running_test_acc / len(testloader):.2f}")
        test_accuracies.append(running_test_acc / len(testloader))

    torch.save(classifier.state_dict(), f'class_params_e{epoch}.pt')

In [None]:
import matplotlib.pyplot as plt

plt.plot(test_accuracies)
plt.show()