In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import cv2

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision import models

In [None]:
from data_utils import get_abs_path
from data_utils import parse_annotations_xml, create_annotations_list
from image_utils import get_bounding_box, create_mask, resize_img_and_bb

In [None]:
root_dir = get_abs_path(1)
annotations_dir = root_dir / 'data' / 'annotations'
images_dir = root_dir / 'data' / 'images'
resized_images_dir = root_dir / 'data' / 'resized_images'
resized_images_dir.mkdir(exist_ok=True, parents=True)

In [None]:
annotations_list = create_annotations_list(annotations_dir)
df = pd.DataFrame(annotations_list)
df = shuffle(df)
df.insert(3, 'resized_img_filename', '')
df.insert(7, 'class_label', '')
df.insert(12, 'bounding_box', '')

class_idx = {'speedlimit': 0, 'stop': 1, 'crosswalk': 2, 'trafficlight': 3}
df['class_label'] = df['class'].apply(lambda i: class_idx[i])

print(df['class'].value_counts())
df.tail()

In [None]:
img_width = 300
img_height = 400

for idx, row in df.iterrows():
    img_path = row['img_filename']
    bounding_box = get_bounding_box(row)

    resized_img, resized_bounding_box = resize_img_and_bb(img_path, bounding_box, img_width, img_height)

    resized_img_filename = str(resized_images_dir) + '/' + row['name'] + '.png'
    if os.path.isfile(resized_img_filename):
        cv2.imwrite(resized_img_filename, resized_img)

    df.at[idx, 'resized_img_filename'] = resized_img_filename
    df.at[idx, 'bounding_box'] = np.array([ resized_bounding_box[0],
                                            resized_bounding_box[1],
                                            resized_bounding_box[2],
                                            resized_bounding_box[3]])

df.tail()

In [None]:
def plot_img_with_mask(df_row):
    img_filename = df_row['resized_img_filename']
    img = cv2.imread(img_filename)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    mask = create_mask( df_row['width'], df_row['height'], df_row['bounding_box'])
    plt.title(df_row['class'])
    plt.imshow(img)
    plt.imshow(mask, alpha=0.6)

In [None]:
for i in range(3):
    plot_img_with_mask(df.iloc[i].to_dict())

In [None]:
test_size = 0.2
batch_size = 4

X = df[['resized_img_filename', 'bounding_box']]
y = df['class_label']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5)

print('train size:', len(X_train))
print('val_size:', len(X_val))
print('test size:', len(X_test))

data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
class RoadSignsDataset(Dataset):

    def __init__(self, paths, bounding_boxes, labels, transforms):
        self.paths = paths.values
        self.bounding_boxes = bounding_boxes.values
        self.labels = labels.values
        self.transforms = transforms

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        path = self.paths[idx]
        x = cv2.imread(path).astype(np.float32)
        x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB) / 255
        x = self.transforms(x)
        label = self.labels[idx]
        bounding_box = self.bounding_boxes[idx]
        return x, label, bounding_box

In [None]:
train_dataset = RoadSignsDataset(X_train['resized_img_filename'], X_train['bounding_box'], y_train, transforms=data_transform)
val_dataset = RoadSignsDataset(X_val['resized_img_filename'], X_val['bounding_box'], y_val, transforms=data_transform)
test_dataset = RoadSignsDataset(X_test['resized_img_filename'], X_test['bounding_box'], y_test, transforms=data_transform)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)
# test_dataloader = DataLoader(test_dataset,  batch_size=batch_size)

In [None]:
class BBmodel(nn.Module):

    def __init__(self):
        super(BBmodel, self).__init__()
        resnet = models.resnet18(pretrained=True)
        layers = list(resnet.children())[:8]
        self.features1 = nn.Sequential(*layers[:6])
        self.features2 = nn.Sequential(*layers[6:])
        self.classifier = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))
        self.bb = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))

    def forward(self, x):
        x = self.features1(x)
        x = self.features2(x)
        x = F.relu(x)
        x = nn.AdaptiveAvgPool2d((1,1))(x)
        x = x.view(x.shape[0], -1)
        return self.classifier(x), self.bb(x)

In [None]:
def lossFunction(pred_label, pred_bb, label, bb, classification_factor=1.0, bounding_box_factor=5e-3):

    classification_loss = F.cross_entropy(pred_label, label,
                                        weight=torch.Tensor([0.3, 1.0, 1.0, 1.0]).cuda(),
                                        reduction='mean')

    bounding_box_loss = F.l1_loss(pred_bb, bb,
                                reduction='mean')

    loss = classification_factor * classification_loss
    loss += bounding_box_factor * bounding_box_loss
    return loss


def evaluate(model, dataloader):
    total_loss = 0
    samples_count = 0
    true_count = 0
    for x, label, bb in dataloader:
        # predictions
        x = x.cuda().float()
        label = label.cuda()
        bb = bb.cuda().float()
        pred_label, pred_bb = model(x)
        # losses
        loss = lossFunction(pred_label, pred_bb, label, bb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # accuracy
        _, pred_label = torch.max(pred_label, 1)
        true_count += pred_label.eq(label).sum().item()
        # epoch loss
        total_loss += loss
        batch_size = label.shape[0]
        samples_count += batch_size

    total_loss = total_loss / samples_count
    classification_accuracy = true_count / samples_count
    return total_loss, classification_accuracy


def train(model, optimizer, train_dataloader, val_dataloader, epochs) -> None:
    for i in range(epochs):

        # train
        model.eval()
        # model.train()
        train_loss, train_accuracy = evaluate(model, train_dataloader)

        # validate
        # model.eval()
        model.train()
        val_loss, val_accuracy = evaluate(model, val_dataloader)

        print('Epoch: %d/%d' % (i+1, epochs))
        print('train loss: %.3f train acc: %.3f val loss: %.3f val acc: %.3f' % (train_loss, train_accuracy, val_loss, val_accuracy))

In [None]:
model = BBmodel().cuda()

In [None]:
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.Adam(parameters, lr=0.001)
train(model, optimizer, train_dataloader, val_dataloader, 3)

In [None]:
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.SGD(parameters, lr=0.001, momentum=0.9)
train(model, optimizer, train_dataloader, val_dataloader, 10)

In [None]:
# image view with bb
x, label, bounding_box = test_dataset[1]

xx = torch.FloatTensor(x[None,])
xx.shape

model.eval()
out_class, out_bb = model(xx.cuda())
print(out_class, out_bb, label, bounding_box)

In [None]:
labels = []
preds = []
for i in range(len(test_dataset)):
    x, label, bounding_box = test_dataset[i]
    xx = torch.FloatTensor(x[None,])

    model.eval()
    out_class, out_bb = model(xx.cuda())

    _, pred = torch.max(out_class, 1)
    pred_idx = pred[0].item()
    labels.append(label)
    preds.append(pred_idx)

cm = confusion_matrix(labels, preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()