# Table of contents


1.   import libraries
2.   download data & extract file
3.   Read File Name & Set label & Clean Data
4.   Read Dataset
5.   Image Augmenation
6.   Randomize our data
7.   Using KFOLD to split our data and labels
8.   Create Train and Test Dataloaders



9. Define Model

> * CNN3DModel

> * ResNet 3D

> * DensNet 3D

> * Encoder Decoder (ConvLSTM)


10. Generating Model

11. Set train model & other parameters

12. Train model

13. visualization loss & accuracy















# ***Import library***

In [32]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pnd
from tqdm import tqdm
import sys
import math
import os
import zipfile
import six
import warnings
import random
import gc

from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

import cv2
import imgaug as ia
from imgaug import augmenters as iaa
from moviepy.editor import VideoFileClip
from PIL import Image
from imageio import imwrite as iw

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch.utils.data import Dataset
import torch.utils.data as Data
from torchvision import transforms
import torchvision.models as models
from torch.autograd import Variable
import torch.optim as optim
from torch.utils.data.sampler import SubsetRandomSampler

# **Download Data & Extract File**

In [2]:
url = "https://ucd5e847d769b25a95722e606559.dl.dropboxusercontent.com/zip_download_get/AhKqMxLj7n8mrkmKUTgJbI7LLn_EZnv5Pe6Lq9FpDCmdp3m_l3gDf8YM11KUvbvBody8_SY8nDrz4Wk194deR9k8l2GOy0YM4hl_EPZVup9IJQ?_download_id=49996537314264544023290189576709031930841558608453748767355469244067&_notify_domain=www.dropbox.com&dl=1"
target_path = '/content/brain.zip'
import requests, zipfile, io
response = requests.get(url, stream=True)
handle = open(target_path, "wb")
for chunk in response.iter_content(chunk_size=100):
    if chunk:  # filter out keep-alive new chunks
        handle.write(chunk)
handle.close()

In [3]:
import zipfile
zip_ref = zipfile.ZipFile('/content/brain.zip', 'r')
zip_ref.extractall('/content/home/train')
zip_ref.close()

# Read File Name & Set label & Clean Data

In [4]:
road = '/content/home/train/brain4cars_data/road_camera/' 
face = '/content/home/train/brain4cars_data/face_camera/'
classes = os.listdir(face) # face and road have the same classes

face_filename=[]
road_filename=[]
labels=[]

for i in range(len(classes)):
  path_face = face + classes[i]
  path_road = road + classes[i]
  face_check = os.listdir(path_face)
  road_check = os.listdir(path_road)

  for j in range(len(face_check)):
    if face_check[j]+'.avi' in road_check:
      video_face_path = path_face+'/'+face_check[j]+'/video_'+face_check[j]+'.avi'
      video_road_path = path_road+'/'+face_check[j]+'.avi'
      
      try:
        clip_face = VideoFileClip(video_face_path)
        clip_road = VideoFileClip(video_road_path)
        a = clip_face.duration
        b = clip_road.duration
      except:
        a = 1 
        b = 2
      if a==b and a>5:
        face_filename.append(video_face_path)
        road_filename.append(video_road_path)
        labels.append(classes[i])
        gc.collect()

# Read Dataset

In [5]:
class BrainforCarsDataset(Dataset):
    def __init__(self, face_filename, road_filename, labels, input_size, sample_rate, num_frames, transform=None):
        self.face_filename = face_filename
        self.road_filename = road_filename
        self.transform = transform
        self.labels = labels
        self.num_frames = num_frames
        self.sample_rate = sample_rate
        self.input_size = input_size
        self.num_imgs = len(self.face_filename)

    def __len__(self):
        return self.num_imgs


    def __getitem__(self, idx):

        count =0
        data_face=[]
        cap = cv2.VideoCapture(self.face_filename[idx])
        if not cap.isOpened():
            print("Unable to connect to camera.")
        while cap.isOpened():

            ret, frame = cap.read()
            
            if ret == True and count%self.sample_rate==0 :
              frame = cv2.resize(frame, (self.input_size, self.input_size), interpolation = cv2.INTER_AREA)
              data_face.append(frame)

            if ret == False or len(data_face)==self.num_frames:
              break
            count=count+ 1

        
        count =0
        data_road=[]
        cap = cv2.VideoCapture(self.road_filename[idx])
        if not cap.isOpened():
            print("Unable to connect to camera.")
        while cap.isOpened():

            ret, frame = cap.read()
            if ret == True and count%self.sample_rate==0 :
              frame = cv2.resize(frame, (self.input_size, self.input_size), interpolation = cv2.INTER_AREA)
              data_road.append(frame)

            if ret == False or len(data_road)== self.num_frames :
              break
            count=count+ 1

        lm = self.labels[idx]
        data_face = np.array(data_face)
        data_road = np.array(data_road)
        sample = {'image_face': data_face,'image_road': data_road, 'label': lm}
        if self.transform:
            sample = self.transform(sample)
        return sample

## Augmentations (all of these Augmentatios dont change the labels of the data)

In [6]:
class ImgAugTransform(object):
  def __init__(self):
    sometimes = lambda aug: iaa.Sometimes(0.2, aug)
    self.aug = iaa.Sequential(
        [
            # apply the following augmenters to most images
            iaa.LinearContrast((2.0, 2.5)), 
            iaa.Invert(1, per_channel=True), 
            sometimes(iaa.Affine(
                scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}, # scale images to 80-120% of their size, individually per axis
                rotate=(-10, 10), # rotate by -45 to +45 degrees
                order=[0, 1], # use nearest neighbour or bilinear interpolation (fast)
                
                mode=ia.ALL # use any of scikit-image's warping modes (see 2nd image from the top for examples)
            )),
            # execute 0 to 5 of the following (less important) augmenters per image
            # don't execute all of them, as that would often be way too strong
            iaa.SomeOf((0, 5),
                [
                    
                    iaa.OneOf([
                        iaa.GaussianBlur((0, 0.5)), # blur images with a sigma between 0 and 3.0
                        iaa.AverageBlur(k=(1, 3)), # blur image using local means with kernel sizes between 2 and 7
                        iaa.MedianBlur(k=(1, 3)), # blur image using local medians with kernel sizes between 2 and 7
                    ]),
                    iaa.Sharpen(alpha=(.9, 1.0), lightness=(0.5, 1.6)), # sharpen images
                    
                    # search either for all edges or for directed edges,
                    # blend the result with the original image using a blobby mask
                    iaa.SimplexNoiseAlpha(iaa.OneOf([
                        iaa.EdgeDetect(alpha=(0.0, 0.2)),
                        
                    ])),
                    
                    iaa.OneOf([
                        iaa.Dropout((0.01, 0.03), per_channel=0.5), # randomly remove up to 10% of the pixels                        
                    ]),
                    iaa.Invert(0.01, per_channel=True), # invert color channels
                    iaa.Add((-2, 2), per_channel=0.5), # change brightness of images (by -2 to 2 of original value)
                    iaa.AddToHueAndSaturation((-1, 1)), # change hue and saturation - add blue light
                    # either change the brightness of the whole image (sometimes
                    # per channel) or change the brightness of subareas
                    iaa.OneOf([
                        iaa.Multiply((0.9, 1.1), per_channel=0.5),
                        iaa.FrequencyNoiseAlpha( exponent=(-1, 0),first=iaa.Multiply((0.9, 1.1), per_channel=True),  # add dark light
                        second=iaa.ContrastNormalization((0.5, 1.5))
                        )
                    ]),
                    sometimes(iaa.ElasticTransformation(alpha=(0.3, 0.5), sigma=0.2)), # move pixels locally around (with random strengths)
                    sometimes(iaa.PiecewiseAffine(scale=(0.01, 0.02))), # sometimes move parts of the image around
                    sometimes(iaa.PerspectiveTransform(scale=(0.01, 0.05))) # change perspective
                ],
                random_order=True
            )
        ],
        random_order=True
    )
      
  def __call__(self, sample):
    img_face = sample['image_face']
    img_face = img.astype(np.uint8)  #imgaug works with np.unit8
    img_face = torch.from_numpy(self.aug.augment_image(img_face).copy())

    img_road = sample['image_road']
    img_road = img_road.astype(np.uint8)
    img_road = torch.from_numpy(self.aug.augment_image(img_road).copy())

    sample_1 = {'image_face': img_face, 'image_road': img_road, 'label':sample['label']}
    return sample1 


# Randomize our data 

In [7]:
random.seed(1254)

combined = list(zip(face_filename, road_filename, labels))
random.shuffle(combined)

face_filename, road_filename,labels = zip(*combined)

# Using KFOLD to split our data and labels

In [15]:
n_fold = 0
kf = KFold(n_splits=5, shuffle=False, random_state=42)
skf = StratifiedKFold(n_splits=5, shuffle=False, random_state=42)
face_filename = np.array(face_filename)
road_filename = np.array(road_filename)
labels = np.array(labels)
for i ,(train_indices, test_indices) in enumerate(skf.split(face_filename, labels)):
  if i ==n_fold:
    face_filename_train = face_filename[train_indices]
    face_filename_test = face_filename[test_indices]
    road_filename_train = road_filename[train_indices]
    road_filename_test = road_filename[test_indices]
    labels_train = labels[train_indices]
    labels_test = labels[test_indices]

# Train and Test Dataloaders

In [21]:
def get_train_loader(input_size, sample_rate, num_frames):
    ImgAug = ImgAugTransform()


    
    composed = transforms.Compose([ImgAug])
    train_data = BrainforCarsDataset(face_filename_train,road_filename_train, labels_train, input_size, sample_rate, num_frames, transform=composed)
    train_loader = Data.DataLoader(train_data, batch_size=1, shuffle=False, num_workers=0)
    return train_loader


def get_test_loader(input_size, sample_rate, num_frames):
    test_data = BrainforCarsDataset(face_filename_test, road_filename_test, labels_test, input_size, sample_rate, num_frames, transform=None)
    test_loader = Data.DataLoader(test_data, batch_size=1, shuffle=False, num_workers=0)
    return test_loader


train_loader = get_train_loader(224, 5, 30)
test_loader = get_test_loader(224, 5, 30)

# Define Model

> CNN3DModel

> ResNet 3D

> DensNet 3D

> Encoder Decoder (ConvLSTM)







In [22]:
class CNN3DModel(nn.Module):
    def __init__(self):
        super(CNN3DModel, self).__init__()
        
        self.conv_layer1 = nn.Sequential(
        nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=0),
        nn.LeakyReLU(),
        nn.MaxPool3d((2, 2, 2)),
        )
        self.conv_layer2 = nn.Sequential(
        nn.Conv3d(32, 64, kernel_size=(3, 3, 3), padding=0),
        nn.LeakyReLU(),
        nn.MaxPool3d((2, 2, 2)),
        )
        self.fc1 = nn.Linear(746496, 128)
        self.fc2 = nn.Linear(128, 5)
        self.relu = nn.LeakyReLU()
        #self.batch =nn.BatchNorm1d(128)
        self.drop=nn.Dropout(p=0.15)        
    
    def forward(self, face, road):
        # Set 1
        out = self.conv_layer1(face)
        out = self.conv_layer2(out)
        out = out.view(out.size(0), -1)

        out1 = self.conv_layer1(road)
        out1 = self.conv_layer2(out1)
        out1 = out.view(out1.size(0), -1)

        out = torch.cat((out,out1),dim=1)

        out = self.fc1(out)
        out = self.relu(out)
        #print(out.size())
        #out = self.batch(out)
        out = self.drop(out)
        out = self.fc2(out)
        
        return out

In [23]:
__all__ = ['ResNet', 'resnet10', 'resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152', 'resnet200']


def conv3x3x3(in_planes, out_planes, stride=1):
    # 3x3x3 convolution with padding
    return nn.Conv3d(in_planes, out_planes, kernel_size=3,
                     stride=stride, padding=1, bias=False)


def downsample_basic_block(x, planes, stride):
    out = F.avg_pool3d(x, kernel_size=1, stride=stride)
    zero_pads = torch.Tensor(out.size(0), planes - out.size(1),
                             out.size(2), out.size(3),
                             out.size(4)).zero_()
    if isinstance(out.data, torch.cuda.FloatTensor):
        zero_pads = zero_pads.cuda()

    out = Variable(torch.cat([out.data, zero_pads], dim=1))

    return out


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3x3(planes, planes)
        self.bn2 = nn.BatchNorm3d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm3d(planes)
        self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(planes)
        self.conv3 = nn.Conv3d(planes, planes * 4, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm3d(planes * 4)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class ResNet(nn.Module):

    def __init__(self, block, layers, sample_size, sample_duration, shortcut_type='B', num_classes=400, last_fc=True):
        self.last_fc = last_fc

        self.inplanes = 64
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv3d(3, 64, kernel_size=7, stride=(1, 2, 2),
                               padding=(3, 3, 3), bias=False)
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0], shortcut_type)
        self.layer2 = self._make_layer(block, 128, layers[1], shortcut_type, stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], shortcut_type, stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], shortcut_type, stride=2)
        last_duration = math.ceil(sample_duration / 16)
        last_size = math.ceil(sample_size / 32)
        self.avgpool = nn.AvgPool3d((last_duration, last_size, last_size), stride=1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm3d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def _make_layer(self, block, planes, blocks, shortcut_type, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            if shortcut_type == 'A':
                downsample = partial(downsample_basic_block,
                                     planes=planes * block.expansion,
                                     stride=stride)
            else:
                downsample = nn.Sequential(
                    nn.Conv3d(self.inplanes, planes * block.expansion,
                              kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm3d(planes * block.expansion)
                )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)

        x = x.view(x.size(0), -1)
        if self.last_fc:
            x = self.fc(x)

        return x


def get_fine_tuning_parameters(model, ft_begin_index):
    if ft_begin_index == 0:
        return model.parameters()

    ft_module_names = []
    for i in range(ft_begin_index, 5):
        ft_module_names.append('layer{}'.format(ft_begin_index))
    ft_module_names.append('fc')

    parameters = []
    for k, v in model.named_parameters():
        for ft_module in ft_module_names:
            if ft_module in k:
                parameters.append({'params': v})
                break
        else:
            parameters.append({'params': v, 'lr': 0.0})

    return parameters


def resnet10(**kwargs):
    """Constructs a ResNet-18 model.
    """
    model = ResNet(BasicBlock, [1, 1, 1, 1], **kwargs)
    return model

def resnet18(**kwargs):
    """Constructs a ResNet-18 model.
    """
    model = ResNet(BasicBlock, [2, 2, 2, 2], **kwargs)
    return model

def resnet34(**kwargs):
    """Constructs a ResNet-34 model.
    """
    model = ResNet(BasicBlock, [3, 4, 6, 3], **kwargs)
    return model

def resnet50(**kwargs):
    """Constructs a ResNet-50 model.
    """
    model = ResNet(Bottleneck, [3, 4, 6, 3], **kwargs)
    return model

def resnet101(**kwargs):
    """Constructs a ResNet-101 model.
    """
    model = ResNet(Bottleneck, [3, 4, 23, 3], **kwargs)
    return model

def resnet152(**kwargs):
    """Constructs a ResNet-101 model.
    """
    model = ResNet(Bottleneck, [3, 8, 36, 3], **kwargs)
    return model

def resnet200(**kwargs):
    """Constructs a ResNet-101 model.
    """
    model = ResNet(Bottleneck, [3, 24, 36, 3], **kwargs)
    return model

In [24]:
__all__ = ['DenseNet', 'densenet121', 'densenet169', 'densenet201', 'densenet264']


def get_fine_tuning_parameters(model, ft_begin_index):
    if ft_begin_index == 0:
        return model.parameters()

    ft_module_names = []
    for i in range(ft_begin_index, 5):
        ft_module_names.append('denseblock{}'.format(ft_begin_index))
        ft_module_names.append('transition{}'.format(ft_begin_index))
    ft_module_names.append('norm5')
    ft_module_names.append('classifier')

    parameters = []
    for k, v in model.named_parameters():
        for ft_module in ft_module_names:
            if ft_module in k:
                parameters.append({'params': v})
                break
        else:
            parameters.append({'params': v, 'lr': 0.0})

    return parameters


class _DenseLayer(nn.Sequential):
    def __init__(self, num_input_features, growth_rate, bn_size, drop_rate):
        super(_DenseLayer, self).__init__()
        self.add_module('norm1', nn.BatchNorm3d(num_input_features))
        self.add_module('relu1', nn.ReLU(inplace=True))
        self.add_module('conv1', nn.Conv3d(num_input_features, bn_size * growth_rate,
                                            kernel_size=1, stride=1, bias=False))
        self.add_module('norm2', nn.BatchNorm3d(bn_size * growth_rate))
        self.add_module('relu2', nn.ReLU(inplace=True))
        self.add_module('conv2', nn.Conv3d(bn_size * growth_rate, growth_rate,
                                            kernel_size=3, stride=1, padding=1, bias=False))
        self.drop_rate = drop_rate

    def forward(self, x):
        new_features = super(_DenseLayer, self).forward(x)
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)


class _DenseBlock(nn.Sequential):
    def __init__(self, num_layers, num_input_features, bn_size, growth_rate, drop_rate):
        super(_DenseBlock, self).__init__()
        for i in range(num_layers):
            layer = _DenseLayer(num_input_features + i * growth_rate, growth_rate, bn_size, drop_rate)
            self.add_module('denselayer%d' % (i + 1), layer)


class _Transition(nn.Sequential):
    def __init__(self, num_input_features, num_output_features):
        super(_Transition, self).__init__()
        self.add_module('norm', nn.BatchNorm3d(num_input_features))
        self.add_module('relu', nn.ReLU(inplace=True))
        self.add_module('conv', nn.Conv3d(num_input_features, num_output_features,
                                          kernel_size=1, stride=1, bias=False))
        self.add_module('pool', nn.AvgPool3d(kernel_size=2, stride=2))


class DenseNet(nn.Module):
    """Densenet-BC model class
    Args:
        growth_rate (int) - how many filters to add each layer (k in paper)
        block_config (list of 4 ints) - how many layers in each pooling block
        num_init_features (int) - the number of filters to learn in the first convolution layer
        bn_size (int) - multiplicative factor for number of bottle neck layers
          (i.e. bn_size * k features in the bottleneck layer)
        drop_rate (float) - dropout rate after each dense layer
        num_classes (int) - number of classification classes
    """
    def __init__(self, sample_size, sample_duration, growth_rate=32, block_config=(6, 12, 24, 16),
                 num_init_features=64, bn_size=4, drop_rate=0, num_classes=1000, last_fc=True):

        super(DenseNet, self).__init__()

        self.last_fc = last_fc

        self.sample_size = sample_size
        self.sample_duration = sample_duration

        # First convolution
        # self.features = nn.Sequential(OrderedDict([
        #     ('conv0', nn.Conv3d(3, num_init_features, kernel_size=7,
        #                         stride=(1, 2, 2), padding=(3, 3, 3), bias=False)),
        #     ('norm0', nn.BatchNorm3d(num_init_features)),
        #     ('relu0', nn.ReLU(inplace=True)),
        #     ('pool0', nn.MaxPool3d(kernel_size=3, stride=2, padding=1)),
        # ]))



        self.features = nn.Sequential(
            nn.Conv3d(3, num_init_features, kernel_size=7,
                                stride=(1, 2, 2), padding=(3, 3, 3), bias=False),
            nn.BatchNorm3d(num_init_features),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=3, stride=2, padding=1),
        )

        # Each denseblock
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            block = _DenseBlock(num_layers=num_layers, num_input_features=num_features,
                                bn_size=bn_size, growth_rate=growth_rate, drop_rate=drop_rate)
            self.features.add_module('denseblock%d' % (i + 1), block)
            num_features = num_features + num_layers * growth_rate
            if i != len(block_config) - 1:
                trans = _Transition(num_input_features=num_features, num_output_features=num_features // 2)
                self.features.add_module('transition%d' % (i + 1), trans)
                num_features = num_features // 2

        # Final batch norm
        self.features.add_module('norm5', nn.BatchNorm3d(num_features))

        # Linear layer
        self.classifier = nn.Linear(num_features, num_classes)

    def forward(self, x):
        features = self.features(x)
        out = F.relu(features, inplace=True)
        last_duration = math.ceil(self.sample_duration / 16)
        last_size = math.floor(self.sample_size / 32)
        out = F.avg_pool3d(out, kernel_size=(last_duration, last_size, last_size)).view(features.size(0), -1)
        if self.last_fc:
            out = self.classifier(out)
        return out

def densenet121(**kwargs):
    model = DenseNet(num_init_features=64, growth_rate=32, block_config=(6, 12, 24, 16),
                     **kwargs)
    return model


def densenet169(**kwargs):
    model = DenseNet(num_init_features=64, growth_rate=32, block_config=(6, 12, 32, 32),
                     **kwargs)
    return model


def densenet201(**kwargs):
    model = DenseNet(num_init_features=64, growth_rate=32, block_config=(6, 12, 48, 32),
                     **kwargs)
    return model


def densenet264(**kwargs):
    model = DenseNet(num_init_features=64, growth_rate=32, block_config=(6, 12, 64, 48),
                     **kwargs)
    return model


In [25]:

def conv2D_output_size(img_size, padding, kernel_size, stride):
    # compute output shape of conv2D
    outshape = (np.floor((img_size[0] + 2 * padding[0] - (kernel_size[0] - 1) - 1) / stride[0] + 1).astype(int),
                np.floor((img_size[1] + 2 * padding[1] - (kernel_size[1] - 1) - 1) / stride[1] + 1).astype(int))
    return outshape


# 2D CNN encoder train from scratch (no transfer learning)
class EncoderCNN(nn.Module):
    def __init__(self, img_x=90, img_y=120, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        super(EncoderCNN, self).__init__()

        self.img_x = img_x
        self.img_y = img_y
        self.CNN_embed_dim = CNN_embed_dim

        # CNN architechtures
        self.ch1, self.ch2, self.ch3, self.ch4 = 32, 64, 128, 256
        self.k1, self.k2, self.k3, self.k4 = (5, 5), (3, 3), (3, 3), (3, 3)      # 2d kernal size
        self.s1, self.s2, self.s3, self.s4 = (2, 2), (2, 2), (2, 2), (2, 2)      # 2d strides
        self.pd1, self.pd2, self.pd3, self.pd4 = (0, 0), (0, 0), (0, 0), (0, 0)  # 2d padding

        # conv2D output shapes
        self.conv1_outshape = conv2D_output_size((self.img_x, self.img_y), self.pd1, self.k1, self.s1)  # Conv1 output shape
        self.conv2_outshape = conv2D_output_size(self.conv1_outshape, self.pd2, self.k2, self.s2)
        self.conv3_outshape = conv2D_output_size(self.conv2_outshape, self.pd3, self.k3, self.s3)
        self.conv4_outshape = conv2D_output_size(self.conv3_outshape, self.pd4, self.k4, self.s4)

        # fully connected layer hidden nodes
        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=self.ch1, kernel_size=self.k1, stride=self.s1, padding=self.pd1),
            nn.BatchNorm2d(self.ch1, momentum=0.01),
            nn.ReLU(inplace=True),                      
            # nn.MaxPool2d(kernel_size=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=self.ch1, out_channels=self.ch2, kernel_size=self.k2, stride=self.s2, padding=self.pd2),
            nn.BatchNorm2d(self.ch2, momentum=0.01),
            nn.ReLU(inplace=True),
            # nn.MaxPool2d(kernel_size=2),
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=self.ch2, out_channels=self.ch3, kernel_size=self.k3, stride=self.s3, padding=self.pd3),
            nn.BatchNorm2d(self.ch3, momentum=0.01),
            nn.ReLU(inplace=True),
            # nn.MaxPool2d(kernel_size=2),
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=self.ch3, out_channels=self.ch4, kernel_size=self.k4, stride=self.s4, padding=self.pd4),
            nn.BatchNorm2d(self.ch4, momentum=0.01),
            nn.ReLU(inplace=True),
            # nn.MaxPool2d(kernel_size=2),
        )

        self.drop = nn.Dropout2d(self.drop_p)
        self.pool = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(self.ch4 * self.conv4_outshape[0] * self.conv4_outshape[1], self.fc_hidden1)   # fully connected layer, output k classes
        self.fc2 = nn.Linear(self.fc_hidden1, self.fc_hidden2)
        self.fc3 = nn.Linear(self.fc_hidden2, self.CNN_embed_dim)   # output = CNN embedding latent variables

    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # CNNs
            x = self.conv1(x_3d[:, t, :, :, :])
            x = self.conv2(x)
            x = self.conv3(x)
            x = self.conv4(x)
            x = x.view(x.size(0), -1)           # flatten the output of conv

            # FC layers
            x = F.relu(self.fc1(x))
            # x = F.dropout(x, p=self.drop_p, training=self.training)
            x = F.relu(self.fc2(x))
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)
            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)

        return cnn_embed_seq


# 2D CNN encoder using ResNet-152 pretrained
class ResCNNEncoder(nn.Module):
    def __init__(self, fc_hidden1=512, fc_hidden2=512, drop_p=0.3, CNN_embed_dim=300):
        """Load the pretrained ResNet-152 and replace top fc layer."""
        super(ResCNNEncoder, self).__init__()

        self.fc_hidden1, self.fc_hidden2 = fc_hidden1, fc_hidden2
        self.drop_p = drop_p

        resnet = models.resnet152(pretrained=True)
        modules = list(resnet.children())[:-1]      # delete the last fc layer.
        self.resnet = nn.Sequential(*modules)
        self.fc1 = nn.Linear(resnet.fc.in_features, fc_hidden1)
        self.bn1 = nn.BatchNorm1d(fc_hidden1, momentum=0.01)
        self.fc2 = nn.Linear(fc_hidden1, fc_hidden2)
        self.bn2 = nn.BatchNorm1d(fc_hidden2, momentum=0.01)
        self.fc3 = nn.Linear(fc_hidden2, CNN_embed_dim)
        
    def forward(self, x_3d):
        cnn_embed_seq = []
        for t in range(x_3d.size(1)):
            # ResNet CNN
            with torch.no_grad():
                x = self.resnet(x_3d[:, t, :, :, :])  # ResNet
                x = x.view(x.size(0), -1)             # flatten output of conv

            # FC layers
            x = self.bn1(self.fc1(x))
            x = F.relu(x)
            x = self.bn2(self.fc2(x))
            x = F.relu(x)
            x = F.dropout(x, p=self.drop_p, training=self.training)
            x = self.fc3(x)

            cnn_embed_seq.append(x)

        # swap time and sample dim such that (sample dim, time dim, CNN latent dim)
        cnn_embed_seq = torch.stack(cnn_embed_seq, dim=0).transpose_(0, 1)
        # cnn_embed_seq: shape=(batch, time_step, input_size)

        return cnn_embed_seq


class DecoderRNN(nn.Module):
    def __init__(self, CNN_embed_dim=300, h_RNN_layers=3, h_RNN=256, h_FC_dim=128, drop_p=0.3, num_classes=50):
        super(DecoderRNN, self).__init__()

        self.RNN_input_size = CNN_embed_dim
        self.h_RNN_layers = h_RNN_layers   # RNN hidden layers
        self.h_RNN = h_RNN                 # RNN hidden nodes
        self.h_FC_dim = h_FC_dim
        self.drop_p = drop_p
        self.num_classes = num_classes

        self.LSTM = nn.LSTM(
            input_size=self.RNN_input_size,
            hidden_size=self.h_RNN,        
            num_layers=h_RNN_layers,       
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.fc1 = nn.Linear(self.h_RNN, self.h_FC_dim)
        self.fc2 = nn.Linear(self.h_FC_dim, self.num_classes)

    def forward(self, x_RNN):
        
        RNN_out, (h_n, h_c) = self.LSTM(x_RNN, None)  
        """ h_n shape (n_layers, batch, hidden_size), h_c shape (n_layers, batch, hidden_size) """ 
        """ None represents zero initial hidden state. RNN_out has shape=(batch, time_step, output_size) """

        # FC layers
        x = self.fc1(RNN_out[:, -1, :])   # choose RNN_out at the last time step
        x = F.relu(x)
        x = F.dropout(x, p=self.drop_p, training=self.training)
        x = self.fc2(x)

        return x

# Generatinh Model

In [26]:
def generate_model(model_name='densenet',n_classes=5,resnet_shortcut='B',model_depth=121,sample_duration=16,sample_size=224,mode='score'):
    assert mode in ['score', 'feature']
    if mode == 'score':
        last_fc = True
    elif mode == 'feature':
        last_fc = False

    assert model_name in ['resnet', 'preresnet', 'wideresnet', 'resnext', 'densenet']

    if model_name == 'resnet':
        assert model_depth in [10, 18, 34, 50, 101, 152, 200]

        if model_depth == 10:
            model = resnet10(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 18:
            model = resnet18(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 34:
            model = resnet34(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 50:
            model = resnet50(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 101:
            model = resnet101(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 152:
            model = resnet152(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
        elif model_depth == 200:
            model = resnet200(num_classes=n_classes, shortcut_type=resnet_shortcut,
                                    sample_size=sample_size, sample_duration=sample_duration,
                                    last_fc=last_fc)
            
    elif model_name == 'densenet':
        assert model_depth in [121, 169, 201, 264]

        if model_depth == 121:
            model = densenet121(num_classes=n_classes,sample_size=sample_size, sample_duration=sample_duration,last_fc=last_fc)
        elif model_depth == 169:
            model = densenet169(num_classes=n_classes,
                                         sample_size=sample_size, sample_duration=sample_duration,
                                         last_fc=last_fc)
        elif model_depth == 201:
            model = densenet201(num_classes=n_classes,
                                         sample_size=sample_size, sample_duration=sample_duration,
                                         last_fc=last_fc)
        elif model_depth == 264:
            model = densenet264(num_classes=n_classes,
                                         sample_size=sample_size, sample_duration=sample_duration,
                                         last_fc=last_fc)



    return model

# Set train model & other parameters

In [27]:
######______________DenseNet 3D or ResNet 3D  ___________#############

model_save_location_and_name = '/content/torch_model.pth'
 
num_epochs = 5
model = generate_model(model_name='densenet', n_classes=5, model_depth=121, sample_duration=16, sample_size=224, mode='score')
#model.cuda()


# Cross Entropy Loss 
error = nn.CrossEntropyLoss()


# SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [30]:
######______________Simple CNN 3D___________#############



model_save_location_and_name = '/content/torch_model.pth'
 


num_epochs = 5
model = CNN3DModel()
#model.cuda()



# Cross Entropy Loss 
error = nn.CrossEntropyLoss()
# SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [31]:
######______________Encoder Decoder(ConvLSTM)___________#############



model_save_location_and_name_encoder = '/content/encoder.pth'
model_save_location_and_name_decoder = '/content/decoder.pth'

 


num_epochs = 5


CNN_fc_hidden1, CNN_fc_hidden2 = 1024, 768
CNN_embed_dim = 512      # latent dim extracted by 2D CNN
img_x, img_y = 224, 224  # resize video 2d frame size
dropout_p = 0.0          # dropout probability

# DecoderRNN architecture
RNN_hidden_layers = 3
RNN_hidden_nodes = 512
RNN_FC_dim = 256


cnn_encoder = EncoderCNN(img_x=img_x, img_y=img_y, fc_hidden1=CNN_fc_hidden1, fc_hidden2=CNN_fc_hidden2,
                         drop_p=dropout_p, CNN_embed_dim=CNN_embed_dim)
rnn_decoder = DecoderRNN(CNN_embed_dim=CNN_embed_dim, h_RNN_layers=RNN_hidden_layers, h_RNN=RNN_hidden_nodes, 
                         h_FC_dim=RNN_FC_dim, drop_p=dropout_p, num_classes=5)

# Cross Entropy Loss 
error = nn.CrossEntropyLoss()
crnn_params = list(cnn_encoder.parameters()) + list(rnn_decoder.parameters())
# SGD Optimizer
learning_rate = 0.001
optimizer = torch.optim.SGD(crnn_params, lr=learning_rate)

# Traing model


> DenseNet 3D or ResNet 3D  

> Simple CNN 3D

> Encoder Decoder(ConvLSTM)



In [None]:
######_______________DenseNet 3D or ResNet 3D___________#############



temp_accuracy = 0
count = 0
loss_list = []
iteration_list = []
accuracy_list = []
for epoch in range(num_epochs):
    for i, samples in enumerate(train_loader):
        
        
        imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
        imgs = imgs.view(1,3,16,224,224)
        imgs_road = imgs_road.view(1,3,16,224,224)
        imgs = imgs.float()
        imgs_road = imgs_road.float()


        optimizer.zero_grad()
        # Forward propagation
        outputs = model(imgs)


        # Calculate softmax and ross entropy loss
        loss = error(outputs, lms)
        # Calculating gradients
        loss.backward()
        # Update parameters
        optimizer.step()
        
        count += 1
        if count % 10 == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for j, samples in enumerate(test_loader):
                
                imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
                imgs = imgs.view(1,3,16,224,224)
                imgs_road = imgs_road.view(1,3,16,224,224)
                imgs = imgs.float()
                imgs_road = imgs_road.float()

                # Forward propagation
                outputs = model(imgs)
                
                
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                
                # Total number of labels
                total += len(lms)
                correct += (predicted == lms).sum()
            
            accuracy = 100 * correct / float(total)
            if temp_accuracy<accuracy:
              temp_accuracy = accuracy
              torch.save(model.state_dict(), model_save_location_and_name )
            # store loss and iteration
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
        if count % 10 == 0:
            # Print Loss
            print('Iteration: {}  Loss: {}  Accuracy: {} %'.format(count, loss.data, accuracy))

In [None]:
######______________Simple CNN 3D___________#############



temp_accuracy = 0
count = 0
loss_list = []
iteration_list = []
accuracy_list = []
for epoch in range(num_epochs):
    for i, samples in enumerate(train_loader):
        
        
        imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
        imgs = imgs.view(1,3,16,224,224)
        imgs_road = imgs_road.view(1,3,16,224,224)
        imgs = imgs.float()
        imgs_road = imgs_road.float()


        optimizer.zero_grad()
        # Forward propagation
        outputs = model(imgs,imgs_road)


        # Calculate softmax and ross entropy loss
        loss = error(outputs, lms)
        # Calculating gradients
        loss.backward()
        # Update parameters
        optimizer.step()
        
        count += 1
        if count % 10 == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for j, samples in enumerate(test_loader):
                
                imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
                imgs = imgs.view(1,3,16,224,224)
                imgs_road = imgs_road.view(1,3,16,224,224)
                imgs = imgs.float()
                imgs_road = imgs_road.float()

                # Forward propagation
                outputs = model(imgs,imgs_road)
                
                
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                
                # Total number of labels
                total += len(lms)
                correct += (predicted == lms).sum()
            
            accuracy = 100 * correct / float(total)
            if temp_accuracy<accuracy:
              temp_accuracy = accuracy
              torch.save(model.state_dict(), model_save_location_and_name )
            # store loss and iteration
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
        if count % 10 == 0:
            # Print Loss
            print('Iteration: {}  Loss: {}  Accuracy: {} %'.format(count, loss.data, accuracy))

In [None]:
######______________Encoder Decoder(ConvLSTM)___________#############




count = 0
loss_list = []
iteration_list = []
accuracy_list = []
for epoch in range(num_epochs):
    for i, samples in enumerate(train_loader):
        
        
        imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
        imgs = imgs.view(1,16,3,224,224)
        imgs_road = imgs_road.view(1,16,3,224,224)
        imgs = imgs.float()
        imgs_road = imgs_road.float()


        optimizer.zero_grad()

        outputs = rnn_decoder(cnn_encoder(imgs)) 
        # Calculate softmax and ross entropy loss
        loss = error(outputs, lms)
        # Calculating gradients
        loss.backward()
        # Update parameters
        optimizer.step()
        
        count += 1
        if count % 10 == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for j, samples in enumerate(test_loader):
                
                imgs,imgs_road, lms = samples['image'],samples['image_road'], samples['label']
                imgs = imgs.view(1,16,3,224,224)
                imgs_road = imgs_road.view(1,16,3,224,224)
                imgs = imgs.float()
                imgs_road = imgs_road.float()


                outputs = rnn_decoder(cnn_encoder(imgs)) 
                # Get predictions from the maximum value
                predicted = torch.max(outputs.data, 1)[1]
                
                # Total number of labels
                total += len(lms)
                correct += (predicted == lms).sum()
            
            accuracy = 100 * correct / float(total)
            
            if temp_accuracy<accuracy:
              temp_accuracy = accuracy
              torch.save(cnn_encoder.state_dict(), model_save_location_and_name_encoder )
              torch.save(rnn_decoder.state_dict(), model_save_location_and_name_decoder )
            # store loss and iteration
            loss_list.append(loss.data)
            iteration_list.append(count)
            accuracy_list.append(accuracy)
        if count % 10 == 0:
            # Print Loss
            print('Iteration: {}  Loss: {}  Accuracy: {} %'.format(count, loss.data, accuracy))

# visualization loss & accuracy

In [None]:
# visualization loss 
plt.plot(iteration_list,loss_list)
plt.xlabel("Number of iteration")
plt.ylabel("Loss")
plt.title("CNN: Loss vs Number of iteration")
plt.show()

# visualization accuracy 
plt.plot(iteration_list,accuracy_list,color = "red")
plt.xlabel("Number of iteration")
plt.ylabel("Accuracy")
plt.title("CNN: Accuracy vs Number of iteration")
plt.show()