In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
import sys
# store the dataset.7z file in MiniProject4
sys.path.insert(0,'/content/gdrive/MyDrive/Colab Notebooks/MiniProject4')

In [None]:
import os 
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import tensorflow as tf 
import tensorflow.keras as keras
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)
from tensorflow.keras.preprocessing import image
import numpy as np
import matplotlib.pyplot as plt
import utils_modelnet as ds

# Main Code provided by the paper

## Data loading and pre-processing

### Data Loading

In [None]:
!7z x '/content/gdrive/MyDrive/Colab Notebooks/MiniProject4/dataset.7z'


7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan /content/gdrive/MyDrive/Colab Notebooks/MiniProject4/                                                               1 file, 437541578 bytes (418 MiB)

Extracting archive: /content/gdrive/MyDrive/Colab Notebooks/MiniProject4/dataset.7z
--
Path = /content/gdrive/MyDrive/Colab Notebooks/MiniProject4/dataset.7z
Type = 7z
Physical Size = 437541578
Headers Size = 459592
Method = LZMA2:24
Solid = +
Blocks = 1

  0%      0% 84         0% 84 - dataset/modelnet2d/airplane/1/1.png                                               

### Pre-processing

In [None]:
dataset_path = 'dataset/modelnet2d/'
class_set =  ['chair', 'car', 'lamp', 'airplane', 'person']

dataset = ds.get_data_from_file(class_set, dataset_path)
train_dataset, valid_dataset, test_dataset = ds.train_test_split(dataset)

train_data, train_label = ds.split_data_label(train_dataset)
test_data, test_label = ds.split_data_label(test_dataset)
valid_data, valid_label = ds.split_data_label(valid_dataset)
## train and validation
print("Train Dataset: {}".format(len(train_dataset)))
print("Test Dataset: {}".format(len(test_dataset)))
print("Valid Dataset: {}".format(len(valid_dataset)))
num_classes = len(class_set)
print("Number of Classes: {}".format(num_classes))
BATCH_SIZE = 32
IMG_SIZE = 48
NUM_CHANNEL = 1

Train Dataset: 10800
Test Dataset: 32400
Valid Dataset: 5400
Number of Classes: 5


### Preparing binocular images

In [None]:
def get_binocular_dataset(data, label, batch_size=BATCH_SIZE):
    def preprocess_image(left_image, right_image):
        left_image = tf.image.decode_jpeg(left_image, channels=NUM_CHANNEL)
        left_image = tf.image.resize(left_image, [IMG_SIZE, IMG_SIZE])
        left_image /= 255.0

        right_image = tf.image.decode_jpeg(right_image, channels=NUM_CHANNEL)
        right_image = tf.image.resize(right_image, [IMG_SIZE, IMG_SIZE])
        right_image /= 255.0  # normalize to [0,1] range
        return left_image, right_image

    def load_and_preprocess_image(left, right):
        left_image = tf.io.read_file(left)
        right_image = tf.io.read_file(right)
        return preprocess_image(left_image, right_image)

    # The tuples are unpacked into the positional arguments of the mapped function 
    def load_and_preprocess_from_path_label(data_path, label):
        return load_and_preprocess_image(data_path[0], data_path[1]), label
    
    ds = tf.data.Dataset.from_tensor_slices((data, label))
    ds = ds.map(load_and_preprocess_from_path_label, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    ds = ds.shuffle(buffer_size=len(data))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    print(ds)
    return ds

In [None]:
train_ds = get_binocular_dataset(train_data, train_label)
test_ds = get_binocular_dataset(test_data, test_label)
valid_ds = get_binocular_dataset(valid_data, valid_label)

<PrefetchDataset shapes: (((None, 48, 48, 1), (None, 48, 48, 1)), (None,)), types: ((tf.float32, tf.float32), tf.int32)>
<PrefetchDataset shapes: (((None, 48, 48, 1), (None, 48, 48, 1)), (None,)), types: ((tf.float32, tf.float32), tf.int32)>
<PrefetchDataset shapes: (((None, 48, 48, 1), (None, 48, 48, 1)), (None,)), types: ((tf.float32, tf.float32), tf.int32)>


## CNN2

### Defining Monocular layer including cmpooling and Convolution

In [None]:
## Utility function
def cmpooling(fmaps, scale_list, pool_stride):
    # make sure the scale_list is in decending order
    if scale_list[0] - scale_list[1] < 0:
        scale_list = scale_list[::-1]
        
    # concentric multi-scale pooling
    offset = [0] + [-(scale_list[i+1] - scale_list[0])//2 for i in range(len(scale_list) - 1)]
    pool_maps = []
    for offset, scale in zip(offset, scale_list):
        slice_maps = tf.slice(fmaps, [0, offset, offset, 0], [-1, fmaps.shape[1]-offset*2, fmaps.shape[2]-offset*2, -1])
        pool_map = tf.nn.max_pool2d(slice_maps, scale, pool_stride, "VALID")
        pool_maps.append(pool_map)
    
    # assert same shape for all pool_map
    for i in range(len(pool_maps)-1):
        assert pool_maps[i].shape[1:] == pool_maps[-1].shape[1:]
    return pool_maps

# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        pool_maps = cmpooling(fmaps, scale_list, pool_stride)
        pool_maps = tf.concat(pool_maps, axis=-1)
        return self.conv(pool_maps)

### Defining CNN2 Model

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)
    right = tf.concat([right_eye, parallax], axis=-1)
    # 
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)
    
    left2 = Monocular(12, 5, name='mono2_left')(tf.concat([left1, right1], axis=-1), scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(tf.concat([right1, left1], axis=-1), scale_list=scale_list, pool_stride=1)
    
    left3 = Monocular(32, 3, name='mono3_left')(tf.concat([left2, right2], axis=-1), scale_list=scale_list, pool_stride=1)
    right3 = Monocular(32, 3, name='mono3_right')(tf.concat([right2, left2], axis=-1), scale_list=scale_list, pool_stride=1)
    
    x = tf.concat([left3, right3], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
def create_model(model, input_shape, num_classes, scale_list):
    m = model(input_shape, num_classes, scale_list)
    # learning rate schedule
    initial_learning_rate = 0.0001
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps=100000,
                                                              decay_rate=0.96, staircase=True)
    
    # compile the model
    m.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),  # Optimizer
                  # Loss function to minimize
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  # List of metrics to monitor
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
                 )
    return m

### Create and fitting the model

In [None]:
cnn2 = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2.fit(train_ds , epochs=10, validation_data=valid_ds)


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe132bd1050>

### Evaluation on test dataset

In [None]:
loss, acc = cnn2.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.018022213131189346, Acc: 0.9939814805984497


# Experiments

## Using different number of scales in CMPooling (additional to the paper experiments)

We changed the scal_list variable to [1,3] and [1,3,5,7] to see the effect of using less and more scales on the accuracy.

### Using 2 scales: 1, 3

In [None]:
SCALE_LIST = [1,3]
cnn2_2scales = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_2scales.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0f13aedd0>

In [None]:
loss, acc = cnn2_2scales.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.04089761897921562, Acc: 0.9865123629570007


### Using 4 scales: 1, 3, 5, 7

In [None]:
SCALE_LIST = [1,3,5,7]
cnn2_4scales = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_4scales.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0f041edd0>

In [None]:
loss, acc = cnn2_4scales.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.02878613770008087, Acc: 0.9905247092247009


We can see the accuracy when we use [1,3,5] as scale list is better than the other two.

## Pooling after Convolution

investigating the effect of placing cmpooling after conv layer on the accuracy

### Updated Monocular layer for Pooling after Convolution

In [None]:
# Changing SCALE_LIST to the original one
SCALE_LIST = [1,3,5]
# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        conv_maps = self.conv(fmaps)
        pool_maps = cmpooling(conv_maps, scale_list, pool_stride)
        return tf.concat(pool_maps, axis=-1)

### Create and fitting the model

In [None]:
cnn2_inverseP = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_inverseP.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0f1556050>

### Evaluation on test dataset

In [None]:
loss, acc = cnn2_inverseP.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.013400280848145485, Acc: 0.9959567785263062


## Ablation study

### 1) Concentric multi-scale (CM) pooling contribution

Using Maxpooling instead of CMPooling

In [None]:
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        pool_maps = tf.keras.layers.MaxPooling2D((2, 2), pool_stride, "SAME")(fmaps)
        return self.conv(pool_maps)

In [None]:
cnn2_maxPool = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_maxPool.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0e672ae10>

In [None]:
loss, acc = cnn2_maxPool.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.02324630320072174, Acc: 0.9937654137611389


### 2) Changing MaxPooling to AvgPooling in CMPooling layer (additional to the paper experiments)

In [None]:
# Changing the Monocular to the original one
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        pool_maps = cmpooling(fmaps, scale_list, pool_stride)
        pool_maps = tf.concat(pool_maps, axis=-1)
        return self.conv(pool_maps)

In [None]:
## Utility function
def cmpooling(fmaps, scale_list, pool_stride):
    # make sure the scale_list is in decending order
    if scale_list[0] - scale_list[1] < 0:
        scale_list = scale_list[::-1]
        
    # concentric multi-scale pooling
    offset = [0] + [-(scale_list[i+1] - scale_list[0])//2 for i in range(len(scale_list) - 1)]
    pool_maps = []
    for offset, scale in zip(offset, scale_list):
        slice_maps = tf.slice(fmaps, [0, offset, offset, 0], [-1, fmaps.shape[1]-offset*2, fmaps.shape[2]-offset*2, -1])
        pool_map = tf.nn.avg_pool2d(slice_maps, scale, pool_stride, "VALID")
        pool_maps.append(pool_map)
    
    # assert same shape for all pool_map
    for i in range(len(pool_maps)-1):
        assert pool_maps[i].shape[1:] == pool_maps[-1].shape[1:]
    return pool_maps

In [None]:
cnn2_CM_avgPooling = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_CM_avgPooling.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0e637a290>

In [None]:
loss, acc = cnn2.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.01860054023563862, Acc: 0.993950605392456


### 3) One feedforward pathways

In [None]:
# Changing the cmpooling to the original one
## Utility function
def cmpooling(fmaps, scale_list, pool_stride):
    # make sure the scale_list is in decending order
    if scale_list[0] - scale_list[1] < 0:
        scale_list = scale_list[::-1]
        
    # concentric multi-scale pooling
    offset = [0] + [-(scale_list[i+1] - scale_list[0])//2 for i in range(len(scale_list) - 1)]
    pool_maps = []
    for offset, scale in zip(offset, scale_list):
        slice_maps = tf.slice(fmaps, [0, offset, offset, 0], [-1, fmaps.shape[1]-offset*2, fmaps.shape[2]-offset*2, -1])
        pool_map = tf.nn.max_pool2d(slice_maps, scale, pool_stride, "VALID")
        pool_maps.append(pool_map)
    
    # assert same shape for all pool_map
    for i in range(len(pool_maps)-1):
        assert pool_maps[i].shape[1:] == pool_maps[-1].shape[1:]
    return pool_maps

In [None]:
# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        conv_maps = self.conv(fmaps)
        pool_maps = tf.keras.layers.MaxPooling2D((2, 2), pool_stride, "SAME")(conv_maps)
        
        return pool_maps

In [None]:
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    x = tf.concat([left_eye, right_eye], axis=-1)

    x = Monocular(6, 5, input_shape=input_shape, name='mono1')(x, scale_list=scale_list, pool_stride=2)    
    x = Monocular(12, 5, name='mono2')(x, scale_list=scale_list, pool_stride=1)    
    x = Monocular(32, 3, name='mono3')(x, scale_list=scale_list, pool_stride=1)
    
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
cnn2_1fP = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_1fP.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0f15ffe90>

In [None]:
loss, acc = cnn2_1fP.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.021849771961569786, Acc: 0.9941975474357605


### 4) Changing number of (CMPooling + conv) layers (additional to the paper experiments)

In [None]:
# Changing the Monocular to the original one
# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        pool_maps = cmpooling(fmaps, scale_list, pool_stride)
        pool_maps = tf.concat(pool_maps, axis=-1)
        return self.conv(pool_maps)

2 (CMPooling + conv) layers 

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)
    right = tf.concat([right_eye, parallax], axis=-1)
    # 
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)
    
    left2 = Monocular(12, 5, name='mono2_left')(tf.concat([left1, right1], axis=-1), scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(tf.concat([right1, left1], axis=-1), scale_list=scale_list, pool_stride=1)
    
    x = tf.concat([left2, right2], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
cnn2_2cm_conv_layers = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_2cm_conv_layers.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0f0547f90>

In [None]:
loss, acc = cnn2_2cm_conv_layers.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.021721355617046356, Acc: 0.9933642148971558


4 (CMPooling + conv) layers 

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)
    right = tf.concat([right_eye, parallax], axis=-1)
    # 
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)
    
    left2 = Monocular(12, 5, name='mono2_left')(tf.concat([left1, right1], axis=-1), scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(tf.concat([right1, left1], axis=-1), scale_list=scale_list, pool_stride=1)
    
    left3 = Monocular(32, 3, name='mono3_left')(tf.concat([left2, right2], axis=-1), scale_list=scale_list, pool_stride=1)
    right3 = Monocular(32, 3, name='mono3_right')(tf.concat([right2, left2], axis=-1), scale_list=scale_list, pool_stride=1)
    
    left4 = Monocular(64, 3, name='mono4_left')(tf.concat([left3, right3], axis=-1), scale_list=scale_list, pool_stride=1)
    right4 = Monocular(64, 3, name='mono4_right')(tf.concat([right3, left3], axis=-1), scale_list=scale_list, pool_stride=1)

    x = tf.concat([left4, right4], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
cnn2_4cm_conv_layers = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_4cm_conv_layers.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0e240ad50>

In [None]:
loss, acc = cnn2_4cm_conv_layers.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.02171819470822811, Acc: 0.9925000071525574


### 5) Without augmentation

CNN2 without Augmentation

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    left = left_eye
    right = right_eye
    # 
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)
    
    left2 = Monocular(12, 5, name='mono2_left')(tf.concat([left1, right1], axis=-1), scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(tf.concat([right1, left1], axis=-1), scale_list=scale_list, pool_stride=1)
    
    left3 = Monocular(32, 3, name='mono3_left')(tf.concat([left2, right2], axis=-1), scale_list=scale_list, pool_stride=1)
    right3 = Monocular(32, 3, name='mono3_right')(tf.concat([right2, left2], axis=-1), scale_list=scale_list, pool_stride=1)
    
    x = tf.concat([left3, right3], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
cnn2_WoAug = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_WoAug.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0c85a3bd0>

In [None]:
loss, acc = cnn2_WoAug.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.01327881682664156, Acc: 0.995771586894989


## Fusion of the Two Feedforward Pathways

In [None]:
SCALE_LIST = [1,3,5]
def CNN2(input_shape, num_classes, scale_list):
    left_eye = tf.keras.Input(input_shape, name='left_eye')
    right_eye = tf.keras.Input(input_shape, name='right_eye')
    
    # parallax augmentation
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)
    right = tf.concat([right_eye, parallax], axis=-1)
    # 
    
    left1 = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    right1 = Monocular(6, 5, input_shape=input_shape, name='mono1_right')(right, scale_list=scale_list, pool_stride=2)

    left2 = Monocular(12, 5, name='mono2_left')(left1, scale_list=scale_list, pool_stride=1)
    right2 = Monocular(12, 5, name='mono2_right')(right1, scale_list=scale_list, pool_stride=1)

    left3 = Monocular(32, 3, name='mono3_left')(left2, scale_list=scale_list, pool_stride=1)
    right3 = Monocular(32, 3, name='mono3_right')(right2, scale_list=scale_list, pool_stride=1)
    
    x = tf.concat([left3, right3], axis=-1)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation='softmax', name='output')(feature_vector)
    
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
cnn2_fu = create_model(CNN2, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn2_fu.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0c81b40d0>

In [None]:
loss, acc = cnn2_fu.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.019315484911203384, Acc: 0.9928086400032043


# Vanilla CNN

In [None]:
def CNN(input_shape, num_classes):
    
    left_eye= tf.keras.Input(shape=input_shape, name = 'left_eye')
    right_eye= tf.keras.Input(shape=input_shape, name = 'right_eye')
    
    parallax = left_eye - right_eye 
    x = tf.concat([left_eye, -parallax], axis=-1)
    
    x = tf.keras.layers.Conv2D(6, 5,  activation='relu', padding= 'same', name = 'mono1_left')(x)
    x = tf.keras.layers.MaxPooling2D()(x)
    x = tf.keras.layers.Conv2D(12, 5, activation='relu', padding= 'same', name='mono2_left')(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(1,1))(x)
    x = tf.keras.layers.Conv2D(32, 3, activation='relu', padding= 'same', name='mono3_left')(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=(1,1))(x)
    x = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(x)
    x = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(x)
    x = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(x)
    feature_vector = tf.keras.layers.Flatten()(x)
    predicted_output = tf.keras.layers.Dense(num_classes, activation="softmax", name='output')(feature_vector)
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
def create_model(model, input_shape, num_classes):
    m = model(input_shape, num_classes)
    # learning rate schedule
    initial_learning_rate = 0.0001
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps=100000,
                                                              decay_rate=0.96, staircase=True)
    
    # compile the model
    m.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),  # Optimizer
                  # Loss function to minimize
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  # List of metrics to monitor
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
                 )
    return m

In [None]:
cnn = create_model(CNN, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5)
cnn.fit(train_ds , epochs=10, validation_data=valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0be3addd0>

In [None]:
loss, acc = cnn.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.01468063984066248, Acc: 0.995555579662323


## VanillaCNN with CMPooling

In [None]:
# Concat the feature maps in different scale and convolution once. (paper version)
class Monocular(tf.keras.layers.Layer):
    def __init__(self, filters, ksize, **kwargs):
        super(Monocular, self).__init__(**kwargs)
        self.filters = filters
        self.ksize = ksize
#         self.conv = tf.keras.layers.Conv2D(filters, ksize, input_shape=kwargs['input_shape'], activation='relu', padding='same')
    
    def build(self, input_shape):
        self.conv = tf.keras.layers.Conv2D(self.filters, self.ksize, input_shape=input_shape, activation='relu', padding='same')
    
    def call(self, fmaps, scale_list, pool_stride):
        conv_maps = self.conv(fmaps)
        pool_maps = cmpooling(conv_maps, scale_list, pool_stride)
        return tf.concat(pool_maps, axis=-1)

In [None]:
SCALE_LIST = [1,3,5]
def CNN(input_shape, num_classes, scale_list):
    
    left_eye= tf.keras.Input(shape=input_shape, name = 'left_eye')
    right_eye= tf.keras.Input(shape=input_shape, name = 'right_eye')
    
    parallax = left_eye - right_eye 
    left = tf.concat([left_eye, -parallax], axis=-1)

    y = Monocular(6, 5, input_shape=input_shape, name='mono1_left')(left, scale_list=scale_list, pool_stride=2)
    y = Monocular(12, 5, input_shape=input_shape, name='mono2_left')(y, scale_list=scale_list, pool_stride=1)
    y = Monocular(32, 3, input_shape=input_shape, name='mono3_left')(y, scale_list=scale_list, pool_stride=1)
    
    y = tf.keras.layers.Conv2D(256, 3, strides=1, activation='relu', name='conv1')(y)
    y = tf.keras.layers.Conv2D(256, 1, strides=1, activation='relu', name='conv2')(y)
    y = tf.keras.layers.Conv2D(64, 1, strides=1, activation='relu', name='conv3')(y)
    feature_vector = tf.keras.layers.Flatten()(y)
    predicted_output = tf.keras.layers.Dense(num_classes, activation="softmax", name='output')(feature_vector)
    return tf.keras.Model([left_eye, right_eye], predicted_output)

In [None]:
def create_model(model, input_shape, num_classes, scale_list):
    m = model(input_shape, num_classes, scale_list)
    # learning rate schedule
    initial_learning_rate = 0.0001
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(initial_learning_rate, decay_steps=100000,
                                                              decay_rate=0.96, staircase=True)
    
    # compile the model
    m.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),  # Optimizer
                  # Loss function to minimize
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  # List of metrics to monitor
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
                 )
    return m

In [None]:
cnn_cmp = create_model(CNN, input_shape=(IMG_SIZE, IMG_SIZE, NUM_CHANNEL), num_classes=5, scale_list=SCALE_LIST)
cnn_cmp.fit(train_ds , epochs=10, validation_data=valid_ds, shuffle=False)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7fe0be214210>

In [None]:
loss, acc = cnn_cmp.evaluate(test_ds)
print(f"Loss: {loss}, Acc: {acc}")

Loss: 0.04395705461502075, Acc: 0.9852468967437744
