In [2]:
!git clone https://github.com/commaai/calib_challenge.git 

In [None]:
!git clone https://github.com/asceznyk/calipy.git

In [13]:
repo_path = '/kaggle/working/calipy/'
challenge_path = '/kaggle/working/calib_challenge/'
train_path = challenge_path+'labeled/'
test_path = challenge_path+'unlabeled/'
predictions_path = challenge_path+'predictions/'

weights_path = '/kaggle/input/calibw/calibnet.best'

In [4]:
import os
import gc
import glob
import cv2
import random
import math

import numpy as np

import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap

from tqdm import tqdm
from sklearn.model_selection import train_test_split

from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.utils as VU
import torchvision.transforms.functional as VF
import torchvision.transforms as T
import torchvision.models as models

from torch.utils.data import Dataset, DataLoader

plt.rcParams["figure.figsize"] = (20, 10)

In [5]:
def display_image_in_actual_size(img_data):
    dpi = 80
    height, width, depth = img_data.shape
    figsize = width / float(dpi), height / float(dpi)
    fig = plt.figure(figsize=figsize)
    ax = fig.add_axes([0, 0, 1, 1])
    ax.axis('off')
    ax.imshow(img_data, cmap='gray')
    plt.show()

def display_image_label_pairs(video_path, label_path, num_frames=1, idx_frame=0):
    cap = cv2.VideoCapture(video_path)
    ret = True
    i = 0
    
    label = np.loadtxt(label_path)
    lp = np.nan_to_num(label[:, 0])
    ly = np.nan_to_num(label[:, 1])
    
    while ret and i < label.shape[0]:
        ret, img = cap.read() 
        if ret:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        if i in range(idx_frame, idx_frame+num_frames):
            display_image_in_actual_size(img)
            print(label[i])
        i += 1
        
    print(f"mean for pitch and yaw {label_path}:{np.mean(lp):.3f}, {np.mean(ly):.3f}")
    print(f"std for pitch and yaw {label_path}:{np.std(lp):.3f}, {np.std(ly):.3f}")
    print("-"*40)

def save_frames(videos_dir, resize=0):
    for video_path in [file for file in os.listdir(videos_dir) if file.endswith('.hevc')]: 
        video_path = f'{videos_dir}/{video_path}'
        cap = cv2.VideoCapture(video_path)
        ret = True
        f = 1
        while ret:
            ret, img = cap.read()
            if ret:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                if resize:
                    img = cv2.resize(img, dsize=(img_size[2], img_size[1]), interpolation=cv2.INTER_AREA)
                cv2.imwrite(f"{video_path.replace('.hevc', '')}_{f}.jpg", img)
            f += 1

In [6]:
def display_video_cap_text(video_path, label_path):
    cap = cv2.VideoCapture(video_path)
    ret = True
    i = 0
    
    label = np.loadtxt(label_path)
    lp = np.nan_to_num(label[:, 0])
    ly = np.nan_to_num(label[:, 1])
    
    while ret and i < label.shape[0]:
        ret, img = cap.read() 
        if ret:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            text = label[i].astype(str)
            cv2.putText(img, str(text), (10, 10), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255,0,0), 1, cv2.LINE_AA)
            
        plt.imshow(img)
        plt.show()
        i += 1
        
    plt.close('all')
        

In [7]:
save_frames(f'{challenge_path}labeled')

In [8]:
max_scale = 1 #* 180/math.pi
img_size = (3, 188, 250)
label_size = 2

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [9]:
def show(imgs):
    if not isinstance(imgs, list):
        imgs = [imgs]
    fix, axs = plt.subplots(ncols=len(imgs), squeeze=False)
    for i, img in enumerate(imgs):
        img = img.detach()
        img = VF.to_pil_image(img)
        axs[0, i].imshow(np.asarray(img))
        axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

In [10]:
class CalibData(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, i):
        img = VF.resize(VF.to_tensor(Image.open(self.img_paths[i])), (img_size[1], img_size[2]))
        label = torch.from_numpy(self.labels[i] * max_scale).float()
        
        if self.transform is not None:
            img = self.transform(img)
        return img, label
    
class DummyData(Dataset):
    def __init__(self, img_size, labels):
        self.mats = torch.zeros((labels.shape[0], *img_size))
        self.labels = labels

    def __len__(self):
        return self.mats.shape[0]

    def __getitem__(self, i):
        mat = self.mats[i]
        label = torch.from_numpy(self.labels[i] * max_scale).float()
        return mat, label
    
def view_angle_images(data, start, end):
    show([data[i][0] for i in range(start, end)])
    print([data[i][1] for i in range(start, end)])
    
def load_img_path_labels(input_dir):
    labels = []
    img_paths = []
    for file in sorted([f for f in os.listdir(input_dir) if f.endswith('txt')]):
        labels.append(np.loadtxt(f'{input_dir}/{file}'))
        for l in range(1, len(labels[-1])+1):
            img_paths.append(f"{input_dir}/{file.replace('.txt', '')}_{l}.jpg")
    return np.array(img_paths), np.vstack(labels)

def get_mse(gt, test):
    test = np.nan_to_num(test)
    return np.mean(np.nanmean((gt - test)**2, axis=0))

def to_radians(deg):
    return deg * math.pi / 180

def mse_zero_percent(gt, mp, convert=0):
    if convert:
        gt = to_radians(gt)
        mp = to_radians(mp)
        
    err_mse = get_mse(gt, mp)
    zero_mse = get_mse(gt, np.zeros_like(gt))
    
    return 100 * (err_mse / (zero_mse if zero_mse > 0 else 1.25e-3))

In [11]:
def fill_zeros_previous(arr):
    for i, r in enumerate(arr):
        if r.sum() == 0 and i > 0:
            arr[i] = arr[i-1]
    return arr
            
def remove_zero_labels(x, y):
    y = y[np.all(y != 0, axis=1)]
    x = x[np.where(np.any(y != 0, axis=1))[0]]
    return x, y
            
def split_data(img_paths, labels, split=0.90, transform=None, non_zero_labels=1, remove_nans=1):
    labels = np.nan_to_num(labels)
    
    if non_zero_labels:
        if remove_nans:
            img_paths, labels = remove_zero_labels(img_paths, labels)
        else:
            labels = fill_zeros_previous(labels)
 
    x_train, x_test, y_train, y_test = train_test_split(img_paths, labels, test_size=(1.0 - split), random_state=42)
    train_size = int(split * x_train.shape[0])
    x_valid, y_valid, x_train, y_train = x_train[train_size:], y_train[train_size:], x_train[:train_size], y_train[:train_size]

    train_data = CalibData(x_train, y_train, transform=transform)
    valid_data = CalibData(x_valid, y_valid)
    test_data = CalibData(x_test, y_test)
    
    return train_data, valid_data, test_data

def load_pretrained_model(model, weights_path):
    model.load_state_dict(torch.load(weights_path))
    return model

In [12]:
class ResBlock(nn.Module):
    def __init__(self, c_in, c_out, stride=1, leaky=1):
        super(ResBlock, self).__init__()
        self.skip = None
        self.leaky = leaky
        
        if stride != 1 or c_in != c_out:
            self.skip = nn.Sequential(
                nn.Conv2d(c_in, c_out, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(c_out)
            )

        self.block = nn.Sequential(
            nn.Conv2d(c_in, c_out, 3, padding=1, stride=1, bias=False),
            nn.BatchNorm2d(c_out),
            nn.LeakyReLU() if leaky else nn.ReLU(inplace=True),
            nn.Conv2d(c_in, c_out, 3, padding=1, stride=1, bias=False),
            nn.BatchNorm2d(c_out)
        )
        
    def forward(self, x):
        act = F.leaky_relu if self.leaky else F.relu 
        return act(self.block(x) + x if self.skip is None else self.skip(x))

    
class CalibNet(nn.Module):
    def __init__(self, img_dim, ang_dim, act='relu'):
        super(CalibNet, self).__init__()
        self.img_dim = img_dim
        self.ang_dim = ang_dim
        
        self.base_cnn = nn.Sequential(
            torch.nn.Sequential(*(list(models.resnet18().children())[:-1])),

            #self.cnn_block(img_dim[0], 24, 5, 2),
            #self.cnn_block(24, 36, 5, 2),
            #self.cnn_block(36, 48, 5, 2),
            #self.cnn_block(48, 60, 5, 2),
            #self.cnn_block(60, 72, 5, 2),
            #self.cnn_block(72, 84, 3, 2),
            #self.cnn_block(84, 96, 3, 2)
        )
        
        self.base_dense = nn.Sequential(
            nn.Flatten(),
            #self.linear_block(512, 100),
            #self.linear_block(100, 50),
            #self.linear_block(50, 10),
            nn.Linear(512, ang_dim)
        )
        
        
        self.act = act
        '''self.d_acts = {
            'leaky': nn.LeakyReLU(),
            'relu': nn.ReLU(inplace=True)
        }'''
        
        self.base_dense[-1].bias.data = torch.tensor([0.0277611 * max_scale , 0.02836007 * max_scale])
        self.apply(self._init_weights)
        
    def _init_weights(self, m):
        if isinstance(m, (nn.Conv2d, nn.Linear)):
            nn.init.kaiming_uniform_(m.weight)
            
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        
    def cnn_block(self, c_in, c_out, k_size, stride, pad=0, bias=False):
        return nn.Sequential(
            nn.Conv2d(c_in, c_out, k_size, stride, padding=pad, bias=bias),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(c_out),
        )

    def linear_block(self, in_units, out_units, bias=True):
        return nn.Sequential(
            nn.Linear(in_units, out_units, bias=bias),
            nn.ReLU(inplace=True)
        )

    def forward(self, x, y=None):
        x = self.base_cnn(x)
        p = self.base_dense(x)

        loss = None
        if y is not None: 
            loss = F.mse_loss(p, torch.nan_to_num(y))

        return p, loss

In [14]:
def visualize_feature_maps(model, x):
    model.eval()
    
    conv_layers = []
    model_children = list(model.base_cnn.children())

    def append_conv(mc_l):
        for child in mc_l:
            conv_layers.append(child)
                
    def process(x):
        feature_map = x.squeeze(0)
        gray_scale = torch.sum(feature_map, 0)
        gray_scale = gray_scale / feature_map.shape[0]
        return gray_scale.data.cpu().numpy()
    
    for i in range(len(model_children)):
        if type(model_children[i]) == nn.Sequential:
            append_conv(model_children[i].children())
        elif type(model_children[i]) == ResBlock:
            append_conv(model_children[i].block.children())
        else:
            conv_layers.append(model_children[i])
     
    outputs = []
    names = []
    
    for layer in conv_layers[0:]:
        x = layer(x)
        outputs.append(process(x))
        names.append(str(layer))
    
    print(len(outputs))
    
    for feature_map in outputs:
        print(feature_map.shape)
        
    fig = plt.figure(figsize=(20, 20))
    min_len = int(np.sqrt(len(outputs)) + 1)
    for i in range(len(outputs)):
        a = fig.add_subplot(min_len, min_len, i+1)
        imgplot = plt.imshow(np.abs(outputs[i]), cmap='gray', norm=plt.Normalize(0, 1))
        a.axis("off")
        a.set_title(f"{names[i].split('(')[0]} -- {i}", fontsize=10)
        
    plt.show()
    
    for o in outputs:
        print(np.abs(o))
    

In [15]:
output_int = {}
def get_activation(name):
    def hook(model, input, output):
        output_int[name] = output.detach()
    return hook

In [None]:
img_paths, labels = load_img_path_labels(f'{challenge_path}labeled')
train_data, valid_data, test_data = split_data(img_paths, labels, transform=None)
model = CalibNet(img_size, 2)

In [None]:
len(train_data), len(test_data), len(valid_data)

In [None]:
train_data[2][0].shape

In [None]:
plt.close('all')
visualize_feature_maps(model, train_data[0][0].unsqueeze(0))

In [16]:
def fit(model, optimizer, train_loader, valid_loader=None, ckpt_path=None, epochs=10, lr=0.001, log_preds=[0,1]): 
    print(f'the learning rate chosen: {lr}')
    
    if type(log_preds) == int:
        log_preds = [log_preds, log_preds]

    def run_epoch(split, log=0):
        is_train = split == 'train' 
        model.train(is_train)
        loader = train_loader if is_train else valid_loader

        avg_loss = 0
        avg_mse_percent = 0
        pbar = tqdm(enumerate(loader), total=len(loader))
        for step, batch in pbar:
            batch = [i.to(device) for i in batch]
            imgs, labels = batch
            
            with torch.set_grad_enabled(is_train):
                preds, loss = model(imgs, labels)
                avg_loss += loss.item() / len(loader)
                avg_mse_percent += mse_zero_percent(np.nan_to_num(labels.detach().cpu().numpy()),  preds.detach().cpu().numpy()) / len(loader)
                
                if log:
                    print('-'*40)
                    print(f'predictions for frames ->')
                    print(preds)
                    print(f'labels for frames ->')
                    print(torch.nan_to_num(labels))
                    print('-'*40)

            if is_train:
                model.zero_grad() 
                loss.backward() 
                optimizer.step()

            pbar.set_description(f"epoch: {e+1}, avg_loss: {avg_loss:.6f}, avg_mse_percent: {avg_mse_percent:.3f}%") 

        return avg_loss

    model.to(device)

    best_loss = float('inf') 
    train_losses, valid_losses = [], []
    for e in range(epochs):
        train_loss = run_epoch('train', log_preds[0])
        valid_loss = run_epoch('valid', log_preds[1]) if valid_loader is not None else train_loss
        
        train_losses.append(train_loss)
        valid_losses.append(valid_loss)
        
        if ckpt_path is not None and valid_loss < best_loss:
            best_loss = valid_loss
            torch.save(model.state_dict(), ckpt_path)
            
    return train_losses, valid_losses

In [17]:
def evaluate(dataset, model, log_preds=1, batch_size=4, non_zero_labels=1, convert=0):
    model.to(device)
    model.eval()
    
    loader = DataLoader(dataset, batch_size=batch_size)
    avg_mse_percent = 0
    for batch in loader:
        batch = [i.to(device) for i in batch]
        imgs, labels = batch

        with torch.no_grad(): 
            preds, loss = model(imgs, labels)
            preds, labels = preds.detach().cpu().numpy(), labels.detach().cpu().numpy()
            
            if non_zero_labels:
                preds, labels = remove_zero_labels(preds, labels)
            
            avg_mse_percent += mse_zero_percent(preds, labels, convert=convert) / len(loader)

            if log_preds:
                print('-'*40)
                print(f'predictions for frame ->')
                print(preds)
                print(f'labels for farme ->')
                print(labels)
                print('-'*40)
                
    print(f'mse error wrt zero error: {avg_mse_percent:.3f}%')

In [18]:
def main(main_dir, epochs=50, batch_size=4, learning_rate=5e-3, single_batch=0, zero_input=0, pretrained_weights=''):
    torch.manual_seed(0)

    img_paths, labels = load_img_path_labels(f'{challenge_path}labeled')
    train_data, valid_data, test_data = split_data(img_paths, labels)
    labels = np.nan_to_num(labels)

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_data, batch_size=batch_size)
    test_loader = DataLoader(test_data, batch_size=batch_size)

    model = CalibNet(img_size, label_size)
    
    print(model)
    print(f'total number of parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}')
    
    if pretrained_weights != '':
        print(f'loading pretrained model from {pretrained_weights}.. ')
        load_pretrained_model(model, pretrained_weights)
        
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) 

    if single_batch:
        random_idx = 1200
        labels = fill_zeros_previous(labels)
        if not zero_input:
            single_data = CalibData(img_paths[random_idx:random_idx+batch_size], labels[random_idx:random_idx+batch_size])
            #view_angle_images(single_data, 0, batch_size)
            single_batch = DataLoader(single_data, batch_size=batch_size)
        else:
            print(f'all inputs to model are zeros, checking training results..')
            single_batch = DataLoader(DummyData(img_size, labels[random_idx: random_idx+batch_size]), batch_size=batch_size)

        train_losses, valid_losses = fit(model, optimizer, single_batch, epochs=epochs, lr=learning_rate, log_preds=1)
        #visualize_intermediate_maps(model, single_data[0][0].unsqueeze(0).to(device))
    else:
        train_losses, valid_losses = fit(model, optimizer, train_loader, valid_loader, epochs=epochs, lr=learning_rate, log_preds=0, ckpt_path='calibnet.best')
        plt.plot(valid_losses)
        
    plt.plot(train_losses)
    plt.show()
    
    return model, train_data, valid_data, test_data

In [None]:
gc.collect()
torch.cuda.empty_cache()
model, _, _, _ = main(train_path, epochs=200, batch_size=4, learning_rate=1e-4, single_batch=1)

In [None]:
del model
gc.collect()
torch.cuda.empty_cache()
model, train_data, valid_data, test_data = main(train_path, epochs=10, batch_size=4, learning_rate=1e-4)

In [None]:
model = CalibNet(img_size, 2)
model.load_state_dict(torch.load('calibnet.best'))

In [None]:
model.base_dense[3].register_forward_hook(get_activation('fc3'))
model(train_data[2][0].unsqueeze(0).to(device))

In [None]:
output_int

In [None]:
plt.close('all')
visualize_feature_maps(model, valid_data[60][0].unsqueeze(0).to(device))

In [None]:
evaluate(train_data, model, log_preds=1, non_zero_labels=0)

In [None]:
evaluate(test_data, model, log_preds=1, non_zero_labels=0)

In [None]:
#labels = np.nan_to_num(labels)
#labels[np.all(labels != 0, axis=1)]
#img_paths[np.where(np.any(labels != 0, axis=1))[0]]

In [None]:
from IPython.display import FileLink
FileLink('calibnet.best')

In [None]:
!ls