In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install tensorflow-addons



In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np

from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, BatchNormalization, Dropout, Activation, Input, Flatten, Conv2D, AveragePooling2D, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy

from tensorflow_addons.layers import StochasticDepth

from sklearn.model_selection import train_test_split

In [None]:
!nvidia-smi

Tue Sep 21 12:21:43 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.63.01    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   58C    P0    32W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

Your runtime has 27.3 gigabytes of available RAM



In [None]:
IMAGE_SIZE = (74, 55, 3)

NUM_MODELS = 6

BATCH_SIZE = 32
EPOCHS = 40

LEARNING_RATE = 0.001
DECAY_EPOCHS = 10
DECAY_RATE = 0.1
STAIRCASE = True

In [None]:
features = np.load('/content/drive/MyDrive/IJCNN/features.npy').reshape((-1, *IMAGE_SIZE))
labels = np.load('/content/drive/MyDrive/IJCNN/labels.npy')

In [None]:
x_train, x_test, y_train, y_test = train_test_split(features, labels, test_size=0.25, shuffle=True)
del features
del labels

In [None]:
class SqueezeAndExcite(tf.keras.layers.Layer):
    def __init__(self, n):
        super().__init__()
        self.n = n

    def build(self, input_shape):

        self.channels = input_shape[-1]

        self.pooling = GlobalAveragePooling2D()
        self.dense_1 = Dense(self.n, activation='relu')
        self.dense_2 =  Dense(self.channels, activation='sigmoid')

    def call(self, x, training=False):

        r = self.pooling(x)
        r = self.dense_1(r, training=training)
        r = self.dense_2(r, training=training)

        r = tf.reshape(r, (-1, 1, 1, self.channels))

        return x * r

class ResidualSEBlock(tf.keras.layers.Layer):
    def __init__(self, filters, strides=1, activation='swish', survival_prob=1, kernel_regularizer=None,  shortcut='identity', reduction=4):
        super().__init__()
        
        self.filters = filters
        self.strides = np.array([strides, strides]) if type(strides) == int else np.array(strides)
        self.activation = activation
        self.survival_prob = survival_prob
        self.kernel_regularizer = kernel_regularizer
        self.shortcut = shortcut
        self.shortcut_mapping = None
        self.reduction = reduction

    def build(self, input_shape):

        spatial_dim = np.array([input_shape[1], input_shape[2]])
        channels = input_shape[-1]

        self.batch_norm_1 = BatchNormalization()
        self.activation_1 = Activation(self.activation)
        self.conv_1 = Conv2D(self.filters//4, 1, padding='same', kernel_regularizer=self.kernel_regularizer)
        
        self.batch_norm_2 = BatchNormalization()
        self.activation_2 = Activation(self.activation)
        self.conv_2 =  Conv2D(self.filters//4, 3, strides=self.strides, padding='same', kernel_regularizer=self.kernel_regularizer)

        self.batch_norm_3 = BatchNormalization()
        self.activation_3 = Activation(self.activation)
        self.conv_3 =  Conv2D(self.filters, 1, padding='same', kernel_regularizer=self.kernel_regularizer)

        self.squeeze_and_excite = SqueezeAndExcite(self.filters//self.reduction)
        
        if channels != self.filters or self.strides.prod() != 1: 
          if self.shortcut == 'identity':
            self.projection = AveragePooling2D(pool_size=self.strides, strides=self.strides, padding='same')
            self.shortcut_mapping = lambda x : tf.pad(self.projection(x), [[0, 0], [0, 0], [0, 0], [0, self.filters - channels]])
          if self.shortcut == 'projection':
            self.projection = Conv2D(self.filters, 1, strides=self.strides, padding='same')
            self.shortcut_mapping = lambda x : self.projection(x)
        else:
          self.shortcut_mapping = lambda x : x
        
        if self.survival_prob != 1:
          self.stochastic_depth = StochasticDepth(self.survival_prob) 

    def call(self, x, training=False):
        
        r = self.batch_norm_1(x, training=training)
        r = self.activation_1(r)
        r = self.conv_1(r, training=training)
        
        r = self.batch_norm_2(r, training=training)
        r = self.activation_2(r)
        r = self.conv_2(r, training=training)

        r = self.batch_norm_3(r, training=training)
        r = self.activation_3(r)
        r = self.conv_3(r, training=training)

        r = self.squeeze_and_excite(r)

        x = self.shortcut_mapping(x)

        if self.survival_prob != 1:
          return self.stochastic_depth([x, r], training=training)

        return x + r

In [None]:
models = [Sequential((
    
    Input(IMAGE_SIZE),

    Conv2D(128, 3),
    ResidualSEBlock(128, survival_prob=0.8),
    ResidualSEBlock(128, survival_prob=0.8),
    ResidualSEBlock(128, survival_prob=0.8),

    ResidualSEBlock(128, strides=2),
    ResidualSEBlock(128, survival_prob=0.8),
    ResidualSEBlock(128, survival_prob=0.8),
    ResidualSEBlock(128, survival_prob=0.8),

    ResidualSEBlock(128, strides=2),
    ResidualSEBlock(128, survival_prob=0.7),
    ResidualSEBlock(256, survival_prob=0.7),
    ResidualSEBlock(256, survival_prob=0.7),

    ResidualSEBlock(256, strides=2),
    ResidualSEBlock(256, survival_prob=0.6),
    ResidualSEBlock(512, survival_prob=0.6),
    ResidualSEBlock(512, survival_prob=0.6),

    ResidualSEBlock(512, strides=2),
    ResidualSEBlock(512, survival_prob=0.5),
    ResidualSEBlock(512, survival_prob=0.5),
    ResidualSEBlock(512, survival_prob=0.5),

    BatchNormalization(),
    Activation('swish'),
    GlobalAveragePooling2D(),
    Dense(1, activation='sigmoid')

)) for i in range(NUM_MODELS)]

models[0].summary()

## Validation

In [None]:
for i, model in enumerate(models):
  
  opt = Adam(tf.keras.optimizers.schedules.ExponentialDecay(LEARNING_RATE, decay_steps=x_train.shape[0]//BATCH_SIZE*DECAY_EPOCHS, decay_rate=DECAY_RATE, staircase=STAIRCASE))
  loss = BinaryCrossentropy()

  model.compile(optimizer=opt, loss=loss, metrics=['accuracy'])
  model.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS , validation_data=(x_test, y_test))

## Training

In [None]:
for i, model in enumerate(models):
  
  opt = Adam(tf.keras.optimizers.schedules.ExponentialDecay(LEARNING_RATE, decay_steps=features.shape[0]//DECAY_ECPOHS, decay_rate=DECAY_RATE, staircase=STAIRCASE))
  loss = BinaryCrossentropy()

  model.compile(optimizer=opt, loss=loss, metrics=['accuracy'])
  model.fit(features, labels, batch_size=BATCH_SIZE, epochs=ECPOCHS)

  model.save(f'/content/drive/MyDrive/IJCNN/Models/model_{i+1}')
  converter = tf.lite.TFLiteConverter.from_saved_model(f'/content/drive/MyDrive/IJCNN/Models/model_{i+1}')
  tflite_model = converter.convert()
  with open(f'/content/drive/MyDrive/IJCNN/Models/model_{i+1}.tflite', 'wb') as f:
    f.write(tflite_model)