## Mounting google drive

# Imports

In [None]:
from math import ceil
import torch
import torch.nn as nn
import torch.nn.functional as F
from datetime import datetime
import psutil
import torch.optim as optim
from sklearn.metrics import accuracy_score
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

# Supporting Classes and Functions

In [None]:
def train_model(model,      # Model
                dataloader, # Dataloader for the training data
                device,     # Choice between CPU and GDU
                criterion,  # Loss function
                optimizer   # Optimizer 
                ):
    model.train()
    for x_train, y_labels in dataloader:
        optimizer.zero_grad()
        out = model(x_train.to(device))
        loss = criterion(out, y_labels.to(device))
        loss.backward()
        optimizer.step()

In [None]:
def eval_model(model,     # Model
               dataloader,# Dataloader for the training data
               device,    # Choice between CPU and GDU
               criterion  # Loss function
               ):
    loss_arr = []
    y_pred = []
    y_true = []
    model.eval()
    with torch.no_grad():
        for x_data, y_labels in dataloader:
            out = model(x_data.to(device))
            loss = criterion(out, y_labels.to(device))
            loss_arr.append(loss.item())
            y_pred.append(out.detach().cpu())
            y_true.append(y_labels.detach().cpu())
    loss_arr = sum(loss_arr) / len(loss_arr)
    # Compute Accuracy 
    y_pred = torch.cat(y_pred)
    y_true = torch.cat(y_true)
    _, y_pred = torch.max(y_pred, 1)
    accuracy =  accuracy_score(y_pred, y_true)
    return accuracy, loss_arr

In [None]:
class CNNBlock(nn.Module):
    def __init__(self,
                 in_channels,   # Number of input channels.
                 out_channels,  # Number of output channels.
                 kernel_size,   # Kernel size for the CNN layer.
                 activation,    # Activation function.
                 bn_epsilon,    # Batch normalization epsilon value.
                 bn_momentum,   # Batch normalization momentum value. 
                 stride = 1,    # Stride of CNN layer.
                 groups = 1,    # Number of groups for CNN layer.
                 ):
      
        super(CNNBlock, self).__init__()

        self.conv_layer = nn.Conv2d(  in_channels   =   in_channels,
                                      out_channels  =   out_channels,
                                      kernel_size   =   kernel_size,
                                      stride        =   stride,
                                      padding       =   kernel_size//2,
                                      groups        =   groups          )

        self.batch_norm_layer = nn.BatchNorm2d( out_channels,
                                                eps      =   bn_epsilon,
                                                momentum =   bn_momentum )
        self.activation_layer = activation

    def out_channels(self):
        return self.conv_layer.out_channels

    def forward(self, x):
        x = self.conv_layer(x)
        x = self.batch_norm_layer(x)
        if self.activation_layer is not None:
            x = self.activation_layer(x)
        return x

In [None]:
class Swish(nn.Module):
    def __init__(self):
        super(Swish, self).__init__()
        self.operation = self.swish_fcn

    def swish_fcn(self, x):
        return x * torch.sigmoid(x)

    def forward(self, x):
        return self.operation(x)

In [None]:
class Stochastic_depth(nn.Module):
    def __init__(self,
                 rate=0.5 # Dropping rate.
                 ):
        super(Stochastic_depth, self).__init__()
        self.keep_prob = 1 - rate

    def forward(self, x):
      # Drop only during training
        if self.training:
          # Create a random tensor
            rand_tensor = self.keep_prob + torch.rand([x.size(0), 1, 1, 1],
                                                        dtype=x.dtype,
                                                        device=x.device)
            # To have zeros and ones only in the tensor
            out_tensor = torch.floor(rand_tensor)
            return torch.mul(torch.div(x, self.keep_prob), out_tensor)
        else:
            return x

In [None]:
# Squeeze Excitation block
class SEBlock(nn.Module):
    def __init__(self,
                 in_channels, # Number of input channels for the squeeze excitation block.
                 reduced_dim, # Number of reduction channels for the squeeze and excitation block.
                 activation   # Activation function for the squeeze and excitation block.
                 ):
        super(SEBlock, self).__init__()
        self.reduction_layer = nn.Conv2d( in_channels   =   in_channels,
                                          out_channels  =   reduced_dim,
                                          kernel_size   =   1)
        
        self.restoration_layer = nn.Conv2d( in_channels   = reduced_dim,
                                            out_channels  = in_channels,
                                            kernel_size   = 1 )
        self.activation = activation

    def forward(self, x):
        inp = x
        x   = F.adaptive_avg_pool2d(x, (1, 1))
        x   = self.reduction_layer(x)
        x   = self.activation(x)
        x   = self.restoration_layer(x)
        x   = torch.sigmoid(x)
        return torch.mul(inp, x)

In [None]:
class InvertedResidualBlock(nn.Module):
    def __init__(self,
                 in_channels,       # Number of input channels
                 out_channels,      # Number of output channels
                 kernel_size,       # kernal size
                 stride,            # stride length
                 expansion_factor,  # expansion factor
                 activation,        # activation type
                 bn_epsilon,        # batch normalization epsilon
                 bn_momentum,       # batch normalization momentum
                 se_size,           # Squeeze and excitation layer reduction layer size
                 drop_connect_rate  # rate of drop connect
                 ):
        super(InvertedResidualBlock, self).__init__()

        exp_channels = in_channels * expansion_factor
        self.activation = activation

        # expansion convolution
        if expansion_factor != 1:
            self.expanded_conv = CNNBlock(  in_channels   = in_channels,
                                          out_channels  = exp_channels,
                                          kernel_size   = 1,
                                          activation    = self.activation,
                                          bn_epsilon    = bn_epsilon,
                                          bn_momentum   = bn_momentum )
        else:
            self.expanded_conv = None

        # depth-wise convolution
        self.deepwise_conv = CNNBlock(  in_channels   = exp_channels,
                                  out_channels  = exp_channels,
                                  kernel_size   = kernel_size,
                                  stride        = stride,
                                  groups        = exp_channels,
                                  activation    = self.activation,
                                  bn_epsilon    = bn_epsilon,
                                  bn_momentum   = bn_momentum )

        self.se = SEBlock(  in_channels = exp_channels,
                            reduced_dim = se_size,
                            activation  =self.activation )
        
        self.drop_connect = Stochastic_depth( rate = drop_connect_rate )

        # Enable of stochastic depth
        if in_channels == out_channels and stride == 1:
            self.skip_enabled = True
        else:
            self.skip_enabled = False

        # projection convolution
        self.projection_conv = CNNBlock( in_channels   = exp_channels,
                                      out_channels  = out_channels,
                                      kernel_size   = 1,
                                      activation    = None,
                                      bn_epsilon    = bn_epsilon,
                                      bn_momentum   = bn_momentum )

    def forward(self, x):
        inp = x
        if self.expanded_conv is not None:
            x = self.expanded_conv(x)
        # depth-wise convolution
        x = self.deepwise_conv(x)
        # squeeze-and-excitation layer
        x = self.se(x)
        # projection convolution
        x = self.projection_conv(x)
        # drop-connect applied only if skip connection enabled
        if self.skip_enabled:
          x = self.drop_connect(x)
          x = x + inp
        return x

In [None]:
# Number of layers per each MBCOnv stage
class MBConvLayersPerStage(nn.Module):
    def __init__(self,
                 num_layers,          # Number of layers per each stage.
                 in_channels,         # Number of input channels for the first layer in the stage.
                 out_channels,        # Number of output channels for the first layer in the stage.
                 stride,              # Stride
                 se_ratio,            # Squeeze and excitation ratio
                 drop_connect_rates,  # Drop connect rates
                 kernel_size,         # Kernel
                 expansion_factor,    # Expansion factor for the width of the MBConv
                 activation,          # Activation function
                 bn_epsilon,          # Batch normalization epsilon 
                 bn_momentum          # Batch normalization momentum
                 ):
      
        super(MBConvLayersPerStage, self).__init__()

        self.num_layers = num_layers
        self.layers = nn.ModuleList()
        for i in range(self.num_layers):
            se_size = max(1, in_channels // se_ratio)
            layer = InvertedResidualBlock(
                                in_channels=in_channels,
                                out_channels=out_channels,
                                stride=stride,
                                se_size=se_size,
                                drop_connect_rate=drop_connect_rates[i],
                                kernel_size = kernel_size,
                                expansion_factor = expansion_factor,
                                activation = activation,
                                bn_epsilon = bn_epsilon,
                                bn_momentum = bn_momentum)
            self.layers.append(layer)
            # Set stride 1 for the rest of the MBConv Layer and the output of 
            # last layer is the same as the input of the next layer.
            stride = 1
            in_channels = out_channels

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

# Completely taken from https://github.com/abhuse/pytorch-efficientnet
def round_filters(filters, width_coefficient, depth_divisor=8):
    """Round number of filters based on depth multiplier."""
    min_depth = depth_divisor

    filters *= width_coefficient
    new_filters = max(min_depth, int(filters + depth_divisor / 2) // depth_divisor * depth_divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_filters < 0.9 * filters:
        new_filters += depth_divisor
    return int(new_filters)


In [None]:
class EfficientNet(nn.Module):
    # (width_coefficient  ,depth_coefficient  ,dropout_rate ,in_spatial_shape(resolution))
    coefficients = [
        (1.0              ,1.0                ,0.2          ,224),
        (1.0              ,1.1                ,0.2          ,240),
        (1.1              ,1.2                ,0.3          ,260),
        (1.2              ,1.4                ,0.3          ,300),
        (1.4              ,1.8                ,0.4          ,380),
        (1.6              ,2.2                ,0.4          ,456),
        (1.8              ,2.6                ,0.5          ,528),
        (2.0              ,3.1                ,0.5          ,600),
    ]

    # block_repeat  ,kernel_size  ,stride ,expansion_factor ,input_channels ,output_channels  ,se_ratio
    stage_args = [
        [1          ,3            ,1      ,1                ,32             ,16               ,4],
        [2          ,3            ,2      ,6                ,16             ,24               ,4],
        [2          ,5            ,2      ,6                ,24             ,40               ,4],
        [3          ,3            ,2      ,6                ,40             ,80               ,4],
        [3          ,5            ,1      ,6                ,80             ,112              ,4],
        [4          ,5            ,2      ,6                ,112            ,192              ,4],
        [1          ,3            ,1      ,6                ,192            ,320              ,4],
    ]

    n_stages = 6 # number of MBCovs

    def __init__(self,
                 b,
                 in_channels=3,         # Number of input channels.
                 n_classes=1000,        # Number of classes.
                 activation=Swish(),    # Activation function
                 drop_connect_rate=0.2, # Drop connect rates
                 bn_epsilon=1e-3,       # Batch normalization epsilon 
                 bn_momentum=0.01       # Batch normalization momentum 
                 ):
      
        super(EfficientNet, self).__init__()

        self.b = b
        self.in_channels = in_channels
        self.activation = activation
        self.drop_connect_rate = drop_connect_rate
        self.width_coefficient = EfficientNet.coefficients[self.b][0]
        self.depth_coefficient = EfficientNet.coefficients[self.b][1]
        self.dropout_rate      = EfficientNet.coefficients[self.b][2]

        # initial convolution
        init_conv_out_channels = round_filters(32, self.width_coefficient)

        self.initial_conv = CNNBlock(
                                   in_channels  =   self.in_channels,
                                   out_channels =   init_conv_out_channels,
                                   kernel_size  =   3,
                                   stride       =   2,
                                   activation   =   self.activation,
                                   bn_epsilon   =   bn_epsilon,
                                   bn_momentum  =   bn_momentum
                                   )
       

        self.stages = nn.ModuleList()
        # Variable to keep track of the current layer number
        stage_curr_layer = 0

        # Compute the drop connect rates based on the scaled layers
        # Get the number of layers of each MBConv block and add them together
        total_num_layers_all = 0
        for i in range(self.n_stages):
            total_num_layers_all += int(ceil(self.depth_coefficient * self.stage_args[i][0]))
        # Array of drop connect
        dc_rates= [self.drop_connect_rate * i / total_num_layers_all
                for i in range(total_num_layers_all)]
      
        for idx in range(self.n_stages):
            # Get the kernel size of the stage.
            kernel_size = EfficientNet.stage_args[idx][1]
            # Get the stride of the stage.
            stride = EfficientNet.stage_args[idx][2]
            # Get the expansion factor of the stage.
            expansion_factor = EfficientNet.stage_args[idx][3]
            # Get the squeeze excitation factor of the stage.
            stage_se_ratio = EfficientNet.stage_args[idx][6]
            # Get the scaled input channels of the stage.
            # Get the baseline number of input channels, and then  scale it with the width scaling.
            stage_in_channels = EfficientNet.stage_args[idx][4] 
            stage_in_channels =  round_filters(stage_in_channels, self.width_coefficient)
            # Get the scaled output channels of the stage.
            # Get the baseline number of output channels, and then  scale it with the width scaling.
            stage_out_channels = EfficientNet.stage_args[idx][5]
            stage_out_channels = round_filters(stage_out_channels, self.width_coefficient)
            # Get Scaled number of layers of each stage.
            stage_num_layers = int(ceil(self.depth_coefficient * EfficientNet.stage_args[idx][0]))
            # Get the DC rates from the current layer to (current layer + state number of layers).
            stage_dc_rates = dc_rates[stage_curr_layer:stage_curr_layer + stage_num_layers]
            # Create the MBConv block and append to the store stages array.
            MBConv_stage = MBConvLayersPerStage(
                              num_layers          =   stage_num_layers,
                              in_channels         =   stage_in_channels,
                              out_channels        =   stage_out_channels,
                              stride              =   stride,
                              se_ratio            =   stage_se_ratio,
                              drop_connect_rates  =   stage_dc_rates,
                              kernel_size         =   kernel_size,
                              expansion_factor    =   expansion_factor,
                              activation          =   self.activation,
                              bn_epsilon          =   bn_epsilon,
                              bn_momentum         =   bn_momentum
                              )
            
            self.stages.append(MBConv_stage)
            # Change the point of the current layer to the start of the next stage.
            stage_curr_layer += stage_num_layers

        # Compute the number of input channels of the last convolutional block
        # which is the number of output channel of the projection layer of the last
        # stage. >> Already scaled.
        in_channels_final_conv = self.stages[-1].layers[-1].projection_conv.out_channels()
        # Compute the number of output channels which is 1280 as per the paper
        # with scaling.
        out_channels_final_conv = round_filters(1280, self.width_coefficient)
        # Define the final convolutional layer.
        self.final_conv_layer = CNNBlock(in_channels  =   in_channels_final_conv,
                                   out_channels       =   out_channels_final_conv,
                                   kernel_size        =   1,
                                   activation         =   self.activation,
                                   bn_epsilon         =   bn_epsilon,
                                   bn_momentum        =   bn_momentum)
        # Define the final dropout layer.
        self.dropout_layer = nn.Dropout(p=self.dropout_rate)
        # Define the final average pooling layer.
        self.avgPool_layer = nn.AdaptiveAvgPool2d((1, 1))
        # Define the final fully connected layer for classification.
        self.fc_layer = nn.Linear(out_channels_final_conv, n_classes)

    def forward(self, x):
        x = self.initial_conv(x)
        for stage in self.stages:
          x = stage(x)
        x = self.final_conv_layer(x)
        x = self.avgPool_layer(x)
        x = torch.flatten(x, 1)
        x = self.dropout_layer(x)
        x = self.fc_layer(x)
        return x

In [None]:
def load_dataset(Dataset,img_size):

  n_classes = 0

  if (Dataset == 'CIFAR10'):
    #CIFAR10
    n_classes = 10
    transform_train = transforms.Compose([
        transforms.RandomCrop(img_size, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    transform_validation = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    dataset_train = datasets.CIFAR10(root='./data',
                                      train=True,
                                      download=True,
                                      transform=transform_train,
                                      )
    dataset_train_val = datasets.CIFAR10(root='./data',
                                          train=True,
                                          download=True,
                                          transform=transform_validation,
                                          )
    dataset_validation = datasets.CIFAR10(root='./data',
                                          train=False,
                                          download=True,
                                          transform=transform_validation,
                                          )
  elif(Dataset == 'CIFAR100'):
    #CIFAR100
    n_classes = 100
    transform_train = transforms.Compose([
        transforms.RandomCrop(img_size, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.507075, 0.48655024, 0.44091907],
                            std=[0.26733398, 0.25643876, 0.2761503]),
    ])

    transform_validation = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5070754,  0.48655024, 0.44091907],
                            std=[0.26733398, 0.25643876, 0.2761503]),
    ])

    dataset_train = datasets.CIFAR100(root='./data',
                                      train=True,
                                      download=True,
                                      transform=transform_train,
                                      )
    dataset_train_val = datasets.CIFAR100(root='./data',
                                          train=True,
                                          download=True,
                                          transform=transform_validation,
                                          )
    dataset_validation = datasets.CIFAR100(root='./data',
                                          train=False,
                                          download=True,
                                          transform=transform_validation,
                                          )
  else:
    raise Exception("Sorry, Wrong dataset entry")

  dataloader_train = DataLoader(dataset_train,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=num_workers,
                              )
  dataloader_train_val = DataLoader(dataset_train_val,
                                    batch_size=batch_size,
                                    shuffle=False,
                                    num_workers=num_workers,
                                    )
  dataloader_validation = DataLoader(dataset_validation,
                                    batch_size=batch_size,
                                    shuffle=False,
                                    num_workers=num_workers,
                                    )

  return dataloader_train ,dataloader_train_val ,dataloader_validation ,n_classes


# Run Training

In [None]:
# Initialize training parameters.
model_index = 0 # 0 >> EfficientNet-B0, 1 >> EfficientNet-B1, 2 >> EfficientNet-B2 etc
batch_size = 128 # 128 for CIFAR10 and 64 for CIFAR100
max_epoch = 100
num_workers = psutil.cpu_count()
img_size = 32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load dataset
dataloader_train ,dataloader_train_val ,dataloader_validation ,n_classes = load_dataset('CIFAR10',img_size)

# Model definition
model = EfficientNet(b=model_index,
                     n_classes=n_classes,
                     )
# Change to CPU/CUDA
model.to(device)

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss()
scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=[30,60,90], gamma=0.1)

# Create a dictionary for data collection.
results = {
    "train_loss": [],
    "valid_loss": [],
    "train_acc": [],
    "valid_acc": [],
}
# Time logging start
start_time = datetime.now()
# Training/Testing loop
for epoch in range(0, max_epoch):
  now = datetime.now()
  train_model(model=model,
              dataloader=dataloader_train,
              device=device,
              criterion=criterion,
              optimizer=optimizer)
  # Evaluation of validation data
  train_accuracy, train_loss = eval_model(model=model,
                                  dataloader=dataloader_train_val,
                                  device=device,
                                  criterion=criterion)
  # Evaluation of testing data
  valid_accuracy, valid_loss = eval_model(model=model,
                              dataloader=dataloader_validation,
                              device=device,
                              criterion=criterion)
  # Appending to results dictionary
  results["train_loss"].append(train_loss)
  results["train_acc"].append(train_accuracy)
  results["valid_loss"].append(valid_loss)
  results["valid_acc"].append(valid_accuracy)
  # Step the scheduler
  scheduler.step(valid_accuracy)
  # Print data
  s = "Epoch:{}/{} | Loss Training/Validation: {:.4f}/{:.4f}".format(epoch, max_epoch, train_loss, valid_loss)
  s += " | Accuracy Training/Validation: {:.4f}/{:.4f}".format(train_accuracy, valid_accuracy)
  s += " Time taken per epoch: +{}".format(datetime.now() - now)
  print(s)

print("Total elapsed time: {}".format(datetime.now() - start_time))


In [None]:
# Number of model parameters
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print(params)

In [None]:
# Plot the loss vs Accuracy
epochs = list(range(max_epoch))
lines = plt.plot(epochs, results["train_loss"], epochs, results["valid_loss"])
plt.legend(('Train', 'Validation'))
plt.title('Loss chart')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.grid(True)
plt.show()

In [None]:
# Plot the epoch vs Accuracy
epochs = list(range(max_epoch))
lines = plt.plot(epochs, results["train_acc"], epochs, results["valid_acc"])
plt.legend(('Train', 'Validation'))
plt.title('Accuracy chart')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.grid(True)
plt.show()