## Bev Classifier Model
Import needed libraries

In [None]:
import json
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import DataParallel
from torch.optim import AdamW
from torch.utils.data import DataLoader
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from torch.utils.data.distributed import DistributedSampler
import torchvision
from PIL import Image
import torchvision.models as models
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torchvision.transforms.functional import crop
from torchvision.transforms import Compose, Resize, Lambda, ToTensor
import warnings
# Suppress the specific UserWarning
warnings.filterwarnings("ignore", message="The default value of the antialias parameter.*", category=UserWarning)
device = torch.device("cuda:1" if torch.cuda.is_available() else torch.device("cpu"))

In [None]:
# Define the train and test datasets
#train and test datasets are split across a few folders but contain test or train in their name
class BevDataset(torch.utils.data.Dataset):
    def __init__(self, root='/mnt/pccfs2/backed_up/cambirrell/bev_classification/images', split='train', transform=None, subset=None, step_one=False):
        self.root = root
        self.transform = transform
        self.split = split
        self.subset = subset
        self.step_one = step_one
        # #use a reg ex to find all the files in the root directory that contain the word train or test
        # self.files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root) for f in filenames if split in f]
        #if train is spesified, open all folders from train_0 to train_69 and add the images in their subdirectories to the files list
        if split == 'train':
            for i in range(70):
                self.files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root + '/train_' + str(i)) for f in filenames]
        #if test is spesified, open all folders from test_0 to test_14 and add the images in their subdirectories to the files list
        if split == 'test':
            for i in range(15):
                self.files = [os.path.join(dp, f) for dp, dn, filenames in os.walk(root + '/test_' + str(i)) for f in filenames]
        # print(len(self.files))
        #remove any files that are not .jpg
        self.files = [f for f in self.files if '.jpg' in f]
        

        #
        #if a subset is specified, then only add the subdirectory that contains the subset name
        if self.subset:
            cat = json.load(open('catagories.json'))
            valid = cat[self.subset]
            self.files = [f for f in self.files if os.path.basename(os.path.dirname(f)) in valid]
        #set the labels based on the subdirectory name
        self.labels = [os.path.basename(os.path.dirname(f)) for f in self.files]
        #if step_one is true, read the catagories .json file and set the labels to the key of the value where the current label is an element of the value
        
        if step_one:
            self.labels = [self.get_label(f) for f in self.labels]
        else:
            self.cat = json.load(open('catagories.json'))
            self.labels = [self.get_label_step_two(f) for f in self.labels]
            
    def get_label(self, f):
            cat = json.load(open('catagories.json'))
            for k, v in cat.items():
                if f in v:
                    return k
    def get_label_step_two(self, f):
        valid = cat[self.subset]
        return valid.index(f)
        
    def __getitem__(self, idx):
        #open the image and apply the transform
        img = Image.open(self.files[idx])
        img = img.convert('RGB')
        if self.transform:
            img = self.transform(img)
        #return the image and the label
        self.labels = [int(i) for i in self.labels]
        if self.subset and not self.step_one:
            cat = json.load(open('catagories.json'))
            valid = cat[self.subset]
            return img, F.one_hot(torch.tensor(self.labels[idx]), len(valid)).float() 
        return img, F.one_hot(torch.tensor(self.labels[idx]), 17).float() if self.step_one else self.labels[idx]
    
    def __len__(self):
        return len(self.files)
        
    

In [None]:
transform = Compose([Resize((224, 224)), ToTensor()])
train_dataset = BevDataset(transform=transform, split='train', step_one=True)

In [None]:
#loop through the train and test datasets and apply the transforms to make sure it all works
transform = Compose([Resize((224, 224)), ToTensor()])
train_dataset = BevDataset(transform=transform, split='train', step_one=True)
# test_dataset = BevDataset(transform=transform, split='test')
data = DataLoader(train_dataset, batch_size=32, shuffle=True)
for x, y in train_dataset:
    print(x.shape, y.shape) 
    

In [None]:
#make a resnet that takes in 3 channel images then outputs 17 classes
class ResNetStepOne(nn.Module):
    def __init__(self):
        super(ResNetStepOne, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.conv1 = nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.resnet.fc = nn.Linear(512, 17)
        
    def forward(self, x):
        return self.resnet(x)

In [None]:
#make an googleNet that takes in 3 channel images then outputs 17 classes
class GoogleNetStepOne(nn.Module):
    def __init__(self):
        super(GoogleNetStepOne, self).__init__()
        self.googlenet = models.googlenet(pretrained=True)
        self.googlenet.conv1 = nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.googlenet.fc = nn.Linear(1024, 17)
        
    def forward(self, x):
        return self.googlenet(x)


In [None]:
a = torch.ones(3, 224, 224)
test_model = GoogleNetStepOne()
test_model(a.unsqueeze(0)).shape

In [None]:
#make 16 resnets that take in 3 channel images then the amount of output classes are the amount of element in the items for each key in the categories.json file
class ResNetStepTwo(nn.Module):
    def __init__(self, output_classes):
        super(ResNetStepTwo, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        self.resnet.conv1 = nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.resnet.fc = nn.Linear(512, output_classes)
        
    def forward(self, x):
        return self.resnet(x)
    

In [None]:
#mkae a vgg that takes in 3 channel images then outputs 17 classes
class VGGStepTwo(nn.Module):
    def __init__(self, output_classes):
        super(VGGStepTwo, self).__init__()
        self.vgg = models.vgg16(pretrained=True)
        self.vgg.features[0] = nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.vgg.classifier[6] = nn.Linear(4096, output_classes)
        # print("Output classes: ", output_classes)
        
    def forward(self, x):
        return self.vgg(x)
    

In [None]:
#Step one training
def train_step_one(model, train_loader, val_loader, epochs=10, lr=1e-3):
    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=lr)
    model.to(device)
    loop = tqdm(total=len(train_loader)*epochs, position=0, leave=False)
    for epoch in range(epochs):
        model.train()
        for i, (x, y) in enumerate(train_loader):
            x, y = x.to(device), y.to(device)
            #make y a one hot vector
            optimizer.zero_grad()
            y_hat = model(x)
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            loop.set_description(f"Epoch [{epoch}/{epochs}]")
            loop.set_postfix(loss=loss.item())
            loop.update(1)
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                y_hat = model(x)
                predicted = torch.argmax(y_hat, 1)
                actual = torch.argmax(y, 1) 
                total += y.size(0)
                correct += (predicted == actual).sum().item()
        print(f'Epoch {epoch}, Validation Accuracy: {correct / total}')
    # return model

In [None]:
# Step one training test
dataset = BevDataset(transform=transform, split='train', step_one=True)
train_size = int(0.8 * len(dataset)) 
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
data = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_data = DataLoader(val_dataset, batch_size=32, shuffle=False)
model = GoogleNetStepOne()
train_step_one(model, data, val_data, epochs=3)

In [None]:
#Step two training - train the expert models
#train all 16 models at once by spreading the data across the gpus
def train_step_two(model, train_loader, val_loader, epochs=10, lr=1e-3):
    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=lr)
    model.to(device)
    # loop = tqdm(total=len(train_loader)*epochs, position=0, leave=False)
    for epoch in range(epochs):
        model.train()
        for i, (x, y) in enumerate(train_loader):
            x, y = x.to(device), y.to(device)
            #make y a one hot vector
            optimizer.zero_grad()
            y_hat = model(x)
            print(y_hat.shape, y.shape)
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            # loop.set_description(f"Epoch [{epoch}/{epochs}]")
            # loop.set_postfix(loss=loss.item())
            # loop.update(1)
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                y_hat = model(x)
                predicted = torch.argmax(y_hat, 1)
                actual = torch.argmax(y, 1) 
                total += y.size(0)
                correct += (predicted == actual).sum().item()
    print(f'Epoch {epoch}, Validation Accuracy: {correct / total}')
    model.to('cpu')
    return model

In [None]:
catagories = json.load(open('catagories.json'))
expert_models = {k: VGGStepTwo(len(v)) for k, v in catagories.items()}
for k, v in expert_models.items():
    print("Catagory:", k)
    dataset = BevDataset(transform=transform, split='train', subset=k, step_one=False)
    train_size = int(0.85 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    data = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_data = DataLoader(val_dataset, batch_size=32, shuffle=False)
    expert_models[k] = train_step_two(expert_models[k], data, val_data, epochs=10)

In [None]:
#evaluation, use the trained models to predict the labels of the test data
#run throught the step one model and then depending on the output, run through the corresponding step two model
#return top 1 and top 5 accuracy and put all the results in a dataframe and then save it to a txt file
def evaluate(model, test_loader, expert):
    model.eval()
    correct = 0
    total = 0
    top_5_correct = 0
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(device), y.to(device)
            y_hat = model(x)
            _, predicted = torch.max(y_hat, 1)
            total += y.size(0)
            correct += (predicted == y).sum().item()
            y_hats = [expert[k](x) for k in catagories.keys()]
            y_hat = torch.cat(y_hats, dim=1)
            _, predicted = torch.max(y_hat, 1)
            top_5_correct += (predicted == y).sum().item()
    top_1_accuracy = correct / total
    top_5_accuracy = top_5_correct / total
    results = pd.DataFrame({'Model': [model.__class__.__name__], 'Top 1 Accuracy': [top_1_accuracy], 'Top 5 Accuracy': [top_5_accuracy]})
    results.to_csv('results.txt', mode='a', header=False, index=False)
    return results

In [None]:
train_loader = DataLoader(BevDataset(split='train', transform=transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])), batch_size=64, shuffle=True)
val_loader = DataLoader(BevDataset(split='val', transform=transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])), batch_size=64, shuffle=True)
test_loader = DataLoader(BevDataset(split='test', transform=transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])), batch_size=64, shuffle=True)
train_step_one(ResNetStepOne(), train_loader, val_loader, epochs=10, lr=1e-3)
train_step_two(expert_models, train_loader, val_loader, epochs=10, lr=1e-3)
evaluate(ResNetStepOne(), test_loader, expert_models)
