# Convolution and Maxpooling

In [None]:
import torch
import torch.nn as nn
import numpy as np
import cv2
import matplotlib.pyplot as plt

In [None]:
mx2d = nn.MaxPool2d((2,2))
cnn = nn.Conv2d(1,2,(2,2),dtype=torch.float64)


In [None]:
x = np.random.uniform(0,10,(1,4,4))
x = torch.from_numpy(x)
y = torch.arange(16, dtype=torch.float64).reshape(1,4,4)
ones = torch.ones_like(x)

In [None]:
x, y, ones

In [None]:
mx2d(x),mx2d(y),mx2d(ones)

In [None]:
cnn(x),cnn(y),cnn(ones)

In [None]:
img = cv2.imread("/home/berens/remote/astyx/dataset/dataset_astyx_hires2019/camera_front/000000.jpg")[:,:,[2,1,0]]
img = img.swapaxes(0,2)
img = torch.from_numpy(img).type(torch.float64)/255.

In [None]:
plt.imshow(img.swapaxes(0,2))

In [None]:
mx2d= nn.MaxPool2d((10,10))
cnn = nn.Conv2d(3,3,(10,10),dtype=torch.float64)

In [None]:
img_mx = mx2d(img)

In [None]:
img_cnn = cnn(img)
img_cnn = img_cnn.detach().numpy()
img_cnn -= img_cnn.min()
img_cnn /= img_cnn.max()

In [None]:
plt.imshow(img_mx.swapaxes(0,2))

In [None]:
plt.imshow(img_cnn.swapaxes(0,2))

# Simple Network

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Create a sequential model
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        # Add a convolutional layer with 6 filters, a kernel size of 5x5 
        self.conv1 = nn.Conv2d(3, 8, 5)
        # Add a max pooling layer with a pool size of 2x2
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(8, 32, 5)
        # Add a fully connected layer
        self.fc1 = nn.Linear(32 * 5 * 5, 124)
        self.fc2 = nn.Linear(124, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


net = Net()

In [None]:
batch_size = 4

# Define the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load the CIFAR10 dataset
transform = transforms.Compose(
    [transforms.Resize((32,32)),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(trainloader)
images, labels = next(dataiter)

# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))

In [None]:
# Define the classes
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

# Define the criterion and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
print('Start Training')
# Train the network
for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
    print('Epoch %d loss: %.3f' % (epoch + 1, running_loss / (i + 1)))

print('Finished Training')

In [None]:
dataiter = iter(testloader)
images, labels = next(dataiter)

# print images
imshow(torchvision.utils.make_grid(images))


In [None]:
# Test the network on the test data
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

# Complex YOLO

## Load packages

In [None]:
import torch
import cv2
import numpy as np
import warnings
import os, sys
from easydict import EasyDict as edict
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

warnings.filterwarnings("ignore", category=UserWarning)

sys.path.append("./src/")

from src.models.model_utils import create_model, get_num_parameters
from src.utils.evaluation_utils import get_batch_statistics_rotated_bbox, ap_per_class, post_processing_v2, rescale_boxes
from src.data_process_astyx.astyx_dataloader import create_val_dataloader
from src.data_process_astyx import astyx_bev_utils as bev_utils

## Prerequirements

* configs - has all necassary information
* val_dataloader - contains the dataset
* model - contains the model

In [None]:
#checkpoints/complex_yolov4_astyx_lidar_split_old/Model_complex_yolov4_astyx_lidar_split_old_epoch_180.pth
#checkpoints/complex_yolov4_astyx_radar_VR_split_old/Model_complex_yolov4_astyx_radar_VR_split_old_epoch_100.pth
#checkpoints/complex_yolov4_astyx_radar_VR_split_old/Model_complex_yolov4_astyx_radar_VR_split_old_epoch_130.pth
#checkpoints/complex_yolov4_astyx_low_fusion_VR_split_old/Model_complex_yolov4_astyx_low_fusion_VR_split_old_epoch_130.pth
#checkpoints/complex_yolov4_astyx_low_fusion_Mag_split_old/Model_complex_yolov4_astyx_low_fusion_Mag_split_old_epoch_700.pth

configs = edict({
                "cfgfile": "./src/config/cfg/complex_yolov4.cfg",
                "dataset_dir": "../astyx/dataset/dataset_astyx_hires2019/",
                "pretrained_path": "checkpoints/complex_yolov4_astyx_low_fusion_VR_split_old/Model_complex_yolov4_astyx_low_fusion_VR_split_old_epoch_130.pth",
                "radar": False,
                "low_fusion": True,
                "lidar": False,
                "VR": True,
                "mag": False,
                "img_size": 608,
                "conf_thresh":0.5,
                "nms_thresh":0.5,
                "iou_thresh":0.5,
                "batch_size": 1,
                "num_workers": 1,
                "pin_memory": True,
                "num_samples": None,
                "device": torch.device('cpu'),
                "arch": "darknet",
                "use_giou_loss": True})

In [None]:
val_dataloader = create_val_dataloader(configs)

In [None]:
model = create_model(configs)
model.load_state_dict(torch.load(configs.pretrained_path, map_location=torch.device(configs.device)))
model = model.to(device=configs.device)
model.eval()

In [None]:
f"The model has {get_num_parameters(model)} many parameters."

## Qualitative Evaluation
First we will visualy evaluate the model.

For this we will load a sample and display it.

In [None]:
batch_id = 25
img_paths, imgs_bev, targets = val_dataloader.dataset[batch_id]
input_imgs = imgs_bev.to(device=configs.device).float()
input_imgs = torch.unsqueeze(input_imgs, dim=0)
targets = torch.unsqueeze(targets, dim=0)

In [None]:
outputs = model(input_imgs)
detections = post_processing_v2(outputs, conf_thresh=configs.conf_thresh, nms_thresh=configs.nms_thresh)

In [None]:
img_detections = []  # Stores detections for each image index
img_detections.extend(detections)

img_bev = imgs_bev.squeeze() * 255
img_bev = img_bev.permute(1, 2, 0).numpy().astype(np.uint8)
img_bev = cv2.resize(img_bev, (configs.img_size, configs.img_size))

img_bev[1:] = np.max(np.concatenate([img_bev[1:], img_bev[:-1]]).reshape(2,configs.img_size-1, configs.img_size,3),0)
img_bev[2:] = np.max(np.concatenate([img_bev[2:], img_bev[:-2]]).reshape(2,configs.img_size-2, configs.img_size,3),0)
img_bev[:,1:] = np.max(np.concatenate([img_bev[:,1:], img_bev[:,:-1]]).reshape(2,configs.img_size, configs.img_size-1,3),0)
img_bev[:,2:] = np.max(np.concatenate([img_bev[:,2:], img_bev[:,:-2]]).reshape(2,configs.img_size, configs.img_size-2,3),0)

targets = targets.reshape((-1,8))

targets[:, 2:6] *= configs.img_size
for targets_ in targets:
    if targets_ is None:
        continue
    _, cls_pred, x, y, w, l, im, re = targets_
    yaw = np.arctan2(im, re)
    # Draw rotated box
    bev_utils.drawRotatedBox(img_bev, x, y, w, l, yaw, [255,255,255])

for detections in img_detections:
    if detections is None:
        continue
    for x, y, w, l, im, re, *_, cls_pred in detections:
        yaw = np.arctan2(im, re)
        # Draw rotated box
        bev_utils.drawRotatedBox(img_bev, x, y, w, l, yaw, [100,255,255])
        
img_rgb = cv2.imread(img_paths[0])

img_bev = cv2.flip(cv2.flip(img_bev, 0), 1)
out_img = img_bev

In [None]:
plt.imshow(out_img)

In [None]:
def make_bev_image(model, batch_id):
    img_paths, imgs_bev, targets = val_dataloader.dataset[batch_id]
    
    input_imgs = imgs_bev.to(device=configs.device).float()
    input_imgs = torch.unsqueeze(input_imgs, dim=0)
    targets = torch.unsqueeze(targets, dim=0)
    
    outputs = model(input_imgs)
    detections = post_processing_v2(outputs, conf_thresh=configs.conf_thresh, nms_thresh=configs.nms_thresh)
    
    img_detections = []  # Stores detections for each image index
    img_detections.extend(detections)

    img_bev = imgs_bev.squeeze() * 255
    img_bev = img_bev.permute(1, 2, 0).numpy().astype(np.uint8)
    img_bev = cv2.resize(img_bev, (configs.img_size, configs.img_size))

    img_bev[1:] = np.max(np.concatenate([img_bev[1:], img_bev[:-1]]).reshape(2,configs.img_size-1, configs.img_size,3),0)
    img_bev[2:] = np.max(np.concatenate([img_bev[2:], img_bev[:-2]]).reshape(2,configs.img_size-2, configs.img_size,3),0)
    img_bev[:,1:] = np.max(np.concatenate([img_bev[:,1:], img_bev[:,:-1]]).reshape(2,configs.img_size, configs.img_size-1,3),0)
    img_bev[:,2:] = np.max(np.concatenate([img_bev[:,2:], img_bev[:,:-2]]).reshape(2,configs.img_size, configs.img_size-2,3),0)

    targets = targets.reshape((-1,8))

    targets[:, 2:6] *= configs.img_size
    for targets_ in targets:
        if targets_ is None:
            continue
        _, cls_pred, x, y, w, l, im, re = targets_
        yaw = np.arctan2(im, re)
        # Draw rotated box
        bev_utils.drawRotatedBox(img_bev, x, y, w, l, yaw, [255,255,255])

    for detections in img_detections:
        if detections is None:
            continue
        for x, y, w, l, im, re, *_, cls_pred in detections:
            yaw = np.arctan2(im, re)
            # Draw rotated box
            bev_utils.drawRotatedBox(img_bev, x, y, w, l, yaw, [100,255,255])

    img_rgb = cv2.imread(img_paths[0])

    img_bev = cv2.flip(cv2.flip(img_bev, 0), 1)
    return img_bev

In [None]:
img = make_bev_image(model, batch_id)
plt.imshow(img)

In [None]:
def update(i):
    print(i)
    img = make_bev_image(model, i)
    imshow = ax.imshow(img)
    plt.title(f"Frame {39+i}")
    return plt.imshow(img) ,

In [None]:
frames = 91
interval=1200
repeat_delay = 3000
fig = plt.figure(figsize=(7,7))
ax = plt.axes()

anim = FuncAnimation(fig, update, frames = frames, interval=interval, blit=True, repeat_delay = repeat_delay)
anim.save('low_fusion.gif', writer='imagemagick')

## Quantitative Evaluation
We calculate the average precision for the evaluatino set.

In [None]:
from time import time

In [None]:
start_time = time()
labels = []
sample_metrics = []
with torch.no_grad():
    for batch_idx, batch_data in enumerate(val_dataloader):
        img_path, imgs, targets = batch_data

        labels += targets[:, 1].tolist()
        # Rescale x, y, w, h of targets ((box_idx, class, x, y, w, l, im, re))
        targets[:, 2:6] *= configs.img_size
        imgs = imgs.to(configs.device, non_blocking=True)
        
        outputs = model(imgs)
        outputs = post_processing_v2(outputs, conf_thresh=configs.conf_thresh, nms_thresh=configs.nms_thresh)
        stats = get_batch_statistics_rotated_bbox(outputs, targets, iou_threshold=configs.iou_thresh)
        sample_metrics += stats if stats else [[np.array([]), torch.tensor([]), torch.tensor([])]]
    
    true_positives, pred_scores, pred_labels = [np.concatenate(x, 0) for x in list(zip(*sample_metrics))]
    precision, recall, AP, f1, ap_class = ap_per_class(true_positives, pred_scores, pred_labels, labels)
end_time = time()

In [None]:
AP

In [None]:
precision, recall, AP, f1, ap_class

In [None]:
end_time - start_time

In [None]:
AP_list = []
for epoch in range(10,300,10):
    configs = edict({
                    "cfgfile": "./src/config/cfg/complex_yolov4.cfg",
                    "dataset_dir": "../astyx/dataset/dataset_astyx_hires2019/",
                    "pretrained_path": f"checkpoints/complex_yolov4_astyx_low_fusion_Mag_split_old/Model_complex_yolov4_astyx_low_fusion_Mag_split_old_epoch_{epoch}.pth",
                    "radar": False,
                    "low_fusion": True,
                    "lidar": False,
                    "VR": False,
                    "mag": True,
                    "img_size": 608,
                    "conf_thresh":0.5,
                    "nms_thresh":0.5,
                    "iou_thresh":0.5,
                    "batch_size": 1,
                    "num_workers": 1,
                    "pin_memory": True,
                    "num_samples": None,
                    "device": torch.device('cpu'),
                    "arch": "darknet",
                    "use_giou_loss": True})
    val_dataloader = create_val_dataloader(configs)
    model = create_model(configs)
    model.load_state_dict(torch.load(configs.pretrained_path, map_location=torch.device(configs.device)))
    model = model.to(device=configs.device)
    model.eval()

    labels = []
    sample_metrics = []
    with torch.no_grad():
        for batch_idx, batch_data in enumerate(val_dataloader):
            img_path, imgs, targets = batch_data

            labels += targets[:, 1].tolist()
            # Rescale x, y, w, h of targets ((box_idx, class, x, y, w, l, im, re))
            targets[:, 2:6] *= configs.img_size
            imgs = imgs.to(configs.device, non_blocking=True)

            outputs = model(imgs)
            outputs = post_processing_v2(outputs, conf_thresh=configs.conf_thresh, nms_thresh=configs.nms_thresh)
            stats = get_batch_statistics_rotated_bbox(outputs, targets, iou_threshold=configs.iou_thresh)
            sample_metrics += stats if stats else [[np.array([]), torch.tensor([]), torch.tensor([])]]

        true_positives, pred_scores, pred_labels = [np.concatenate(x, 0) for x in list(zip(*sample_metrics))]
        precision, recall, AP, f1, ap_class = ap_per_class(true_positives, pred_scores, pred_labels, labels)
    AP_list += AP
    print(epoch, AP)