In [None]:
# import header files
%matplotlib inline
import torch
import torch.nn as nn
import torchvision
from functools import partial
from dataclasses import dataclass
from collections import OrderedDict
import glob
import os
import random
import tensorflow as tf
from tensorflow import keras
import numpy as np
import seaborn as sn
import pandas as pd
from matplotlib import pyplot as plt
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
import time
import copy
import tqdm
import torch
import random
from PIL import Image
import torch.optim as optim
from torchvision import models
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset,DataLoader

In [None]:
# load my google drive
def auth_gdrive():
  from google.colab import drive
  if os.path.exists('content/gdrive/My Drive'): return
  drive.mount('/content/gdrive')
def load_gdrive_dataset():
  loader_assets = 'myepill.zip'
  auth_gdrive()

In [None]:
# mount my google drive
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
load_gdrive_dataset()

Mounted at /content/gdrive
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# unzip dataset
!unzip "/content/gdrive/MyDrive/myepill.zip"

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: myepill/train/2/55154-3495_0_0.jpg  
  inflating: myepill/train/2/55154-3495_0_1.jpg  
  inflating: myepill/train/2/55154-3516_0_0.jpg  
  inflating: myepill/train/2/55154-3516_0_1.jpg  
  inflating: myepill/train/2/55154-3526_0_0.jpg  
  inflating: myepill/train/2/55154-3526_0_1.jpg  
  inflating: myepill/train/2/55154-3580_0_0.jpg  
  inflating: myepill/train/2/55154-3580_0_1.jpg  
  inflating: myepill/train/2/55154-3581_0_0.jpg  
  inflating: myepill/train/2/55154-3581_0_1.jpg  
  inflating: myepill/train/2/55154-3582_0_0.jpg  
  inflating: myepill/train/2/55154-3582_0_1.jpg  
  inflating: myepill/train/2/55154-3621_0_0.jpg  
  inflating: myepill/train/2/55154-3621_0_1.jpg  
  inflating: myepill/train/2/55154-3622_0_0.jpg  
  inflating: myepill/train/2/55154-3622_0_1.jpg  
  inflating: myepill/train/2/55154-3625_0_0.jpg  
  inflating: myepill/train/2/55154-3625_0_1.jpg  
  inflating: myepill/train/2/55154-

In [None]:
# define transforms
train_transforms = torchvision.transforms.Compose([torchvision.transforms.RandomRotation(30),
                                       torchvision.transforms.Resize((32, 32)),
                                       torchvision.transforms.RandomHorizontalFlip(),
                                       torchvision.transforms.ToTensor(),
                                       torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

In [None]:
# get data
train_data = torchvision.datasets.ImageFolder("/content/myepill/train/", transform=train_transforms)
test_data = torchvision.datasets.ImageFolder("/content/myepill/test/", transform=train_transforms)

In [None]:
# data loader
trainloader = torch.utils.data.DataLoader(train_data, batch_size=128, shuffle=True, num_workers=1, pin_memory=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=128, shuffle=True, num_workers=1, pin_memory=True)

In [None]:
# define the model 
import torch
import torch.nn as nn
import torch.nn.functional as F


__all__ = ['MobileNetV3', 'mobilenetv3']


def conv_bn(inp, oup, stride, conv_layer=nn.Conv2d, norm_layer=nn.BatchNorm2d, nlin_layer=nn.ReLU):
    return nn.Sequential(
        conv_layer(inp, oup, 3, stride, 1, bias=False),
        norm_layer(oup),
        nlin_layer(inplace=True)
    )


def conv_1x1_bn(inp, oup, conv_layer=nn.Conv2d, norm_layer=nn.BatchNorm2d, nlin_layer=nn.ReLU):
    return nn.Sequential(
        conv_layer(inp, oup, 1, 1, 0, bias=False),
        norm_layer(oup),
        nlin_layer(inplace=True)
    )


class Hswish(nn.Module):
    def __init__(self, inplace=True):
        super(Hswish, self).__init__()
        self.inplace = inplace

    def forward(self, x):
        return x * F.relu6(x + 3., inplace=self.inplace) / 6.


class Hsigmoid(nn.Module):
    def __init__(self, inplace=True):
        super(Hsigmoid, self).__init__()
        self.inplace = inplace

    def forward(self, x):
        return F.relu6(x + 3., inplace=self.inplace) / 6.


class SEModule(nn.Module):
    def __init__(self, channel, reduction=4):
        super(SEModule, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            Hsigmoid()
            # nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)


class Identity(nn.Module):
    def __init__(self, channel):
        super(Identity, self).__init__()

    def forward(self, x):
        return x


def make_divisible(x, divisible_by=8):
    import numpy as np
    return int(np.ceil(x * 1. / divisible_by) * divisible_by)


class MobileBottleneck(nn.Module):
    def __init__(self, inp, oup, kernel, stride, exp, se=False, nl='RE'):
        super(MobileBottleneck, self).__init__()
        assert stride in [1, 2]
        assert kernel in [3, 5]
        padding = (kernel - 1) // 2
        self.use_res_connect = stride == 1 and inp == oup

        conv_layer = nn.Conv2d
        norm_layer = nn.BatchNorm2d
        if nl == 'RE':
            nlin_layer = nn.ReLU  # or ReLU6
        elif nl == 'HS':
            nlin_layer = Hswish
        else:
            raise NotImplementedError
        if se:
            SELayer = SEModule
        else:
            SELayer = Identity

        self.conv = nn.Sequential(
            # pw
            conv_layer(inp, exp, 1, 1, 0, bias=False),
            norm_layer(exp),
            nlin_layer(inplace=True),
            # dw
            conv_layer(exp, exp, kernel, stride,
                       padding, groups=exp, bias=False),
            norm_layer(exp),
            SELayer(exp),
            nlin_layer(inplace=True),
            # pw-linear
            conv_layer(exp, oup, 1, 1, 0, bias=False),
            norm_layer(oup),
        )

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV3(nn.Module):
    def __init__(self, n_class=1000, input_size=224, mode='small', width_mult=1.0):
        super(MobileNetV3, self).__init__()
        input_channel = 16
        last_channel = 1280
        if mode == 'large':
            # refer to Table 1 in paper
            mobile_setting = [
                # k, exp, c,  se,     nl,  s,
                [3, 16,  16,  False, 'RE', 1],
                [3, 64,  24,  False, 'RE', 2],
                [3, 72,  24,  False, 'RE', 1],
                [5, 72,  40,  True,  'RE', 2],
                [5, 120, 40,  True,  'RE', 1],
                [5, 120, 40,  True,  'RE', 1],
                [3, 240, 80,  False, 'HS', 2],
                [3, 200, 80,  False, 'HS', 1],
                [3, 184, 80,  False, 'HS', 1],
                [3, 184, 80,  False, 'HS', 1],
                [3, 480, 112, True,  'HS', 1],
                [3, 672, 112, True,  'HS', 1],
                # c = 112, paper set it to 160 by error
                [5, 672, 112, True,  'HS', 1],
                [5, 672, 160, True,  'HS', 2],
                [5, 960, 160, True,  'HS', 1],
            ]
        elif mode == 'small':
            # refer to Table 2 in paper
            mobile_setting = [
                # k, exp, c,  se,     nl,  s,
                [3, 16,  16,  True,  'RE', 2],
                [3, 72,  24,  False, 'RE', 2],
                [3, 88,  24,  False, 'RE', 1],
                # stride = 2, paper set it to 1 by error
                [5, 96,  40,  True,  'HS', 2],
                [5, 240, 40,  True,  'HS', 1],
                [5, 240, 40,  True,  'HS', 1],
                [5, 120, 48,  True,  'HS', 1],
                [5, 144, 48,  True,  'HS', 1],
                [5, 288, 96,  True,  'HS', 2],
                [5, 576, 96,  True,  'HS', 1],
                [5, 576, 96,  True,  'HS', 1],
            ]
        else:
            raise NotImplementedError

        # building first layer
        assert input_size % 32 == 0
        # input_channel = make_divisible(input_channel * width_mult)  # first channel is always 16!
        self.last_channel = make_divisible(
            last_channel * width_mult) if width_mult > 1.0 else last_channel
        self.features = [conv_bn(3, input_channel, 2, nlin_layer=Hswish)]

        # building mobile blocks
        for k, exp, c, se, nl, s in mobile_setting:
            output_channel = make_divisible(c * width_mult)
            exp_channel = make_divisible(exp * width_mult)
            self.features.append(MobileBottleneck(
                input_channel, output_channel, k, s, exp_channel, se, nl))
            input_channel = output_channel

        # building last several layers
        if mode == 'large':
            last_conv = make_divisible(960 * width_mult)
            self.features.append(conv_1x1_bn(
                input_channel, last_conv, nlin_layer=Hswish))
            self.features.append(nn.AdaptiveAvgPool2d(1))
            self.features.append(Hswish(inplace=True))
            self.features.append(nn.Conv2d(last_conv, last_channel, 1, 1, 0))
            self.features.append(Hswish(inplace=True))
            self.features.append(nn.Conv2d(last_channel, n_class, 1, 1, 0))
        elif mode == 'small':
            last_conv = make_divisible(576 * width_mult)
            self.features.append(conv_1x1_bn(
                input_channel, last_conv, nlin_layer=Hswish))
            self.features.append(SEModule(last_conv))  # refer to paper Table2
            self.features.append(nn.AdaptiveAvgPool2d(1))
            self.features.append(Hswish(inplace=True))
            self.features.append(conv_1x1_bn(
                last_conv, last_channel, nlin_layer=Hswish))
            self.features.append(conv_1x1_bn(
                last_channel, n_class, nlin_layer=Hswish))
        else:
            raise NotImplementedError

        # make it nn.Sequential
        self.features = nn.Sequential(*self.features)

        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = x.mean(3).mean(2)
        return x

    def _initialize_weights(self):
        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0.0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1.0)
                nn.init.constant_(m.bias, 0.0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0.0)


def mobilenetv3(pretrained=False, **kwargs):
    model = MobileNetV3(**kwargs)
    if pretrained:
        raise NotImplementedError
    return model


In [None]:
# print the model
import math
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MobileNetV3()
model.to(device)

MobileNetV3(
  (features): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): Hswish()
    )
    (1): MobileBottleneck(
      (conv): Sequential(
        (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU(inplace=True)
        (3): Conv2d(16, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=16, bias=False)
        (4): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): SEModule(
          (avg_pool): AdaptiveAvgPool2d(output_size=1)
          (fc): Sequential(
            (0): Linear(in_features=16, out_features=4, bias=False)
            (1): ReLU(inplace=True)
            (2): Linear(in_features=4, out_features=16, bias=False)
         

In [None]:
# print summary of the model
from torchvision import models
from torchsummary import summary
summary(model, (3, 32, 32))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 16, 16, 16]             432
       BatchNorm2d-2           [-1, 16, 16, 16]              32
            Hswish-3           [-1, 16, 16, 16]               0
            Conv2d-4           [-1, 16, 16, 16]             256
       BatchNorm2d-5           [-1, 16, 16, 16]              32
              ReLU-6           [-1, 16, 16, 16]               0
            Conv2d-7             [-1, 16, 8, 8]             144
       BatchNorm2d-8             [-1, 16, 8, 8]              32
 AdaptiveAvgPool2d-9             [-1, 16, 1, 1]               0
           Linear-10                    [-1, 4]              64
             ReLU-11                    [-1, 4]               0
           Linear-12                   [-1, 16]              64
         Hsigmoid-13                   [-1, 16]               0
         SEModule-14             [-1, 1

In [None]:
# loss function to be used
criterion = torch.nn.CrossEntropyLoss()
# optimizer to be used
optimizer = torch.optim.SGD(model.parameters(), lr=5e-3, momentum=0.9, weight_decay=5e-4)

In [None]:
# training step...
from torch.utils.tensorboard import SummaryWriter
train_losses = 0.0
train_accuracy = 0
epochs = 50
for epoch in range(epochs):  # loop over the dataset multiple times
    print('Epoch-{0}:'.format(epoch + 1, optimizer.param_groups[0]['lr']))
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data # get the inputs; data is a list of [inputs, labels]
        inputs, labels = inputs.cuda(), labels.cuda() # for using data in GPU
        optimizer.zero_grad() # zero the parameter gradients
        outputs = model(inputs) # forward
        loss = criterion(outputs, labels) # calculate loss
        loss.backward() # backward loss
        optimizer.step() # optimize gradients
        train_losses += loss.item() # save loss
        _, preds = torch.max(outputs, 1) # save prediction
        train_accuracy += torch.sum(preds == labels.data) # save train_accuracy       
        if i % 1000 == 999:    # every 1000 mini-batches...           
            steps = epoch * len(trainloader) + i # calculate steps 
            batch = i*batch_size # calculate batch 
            # Save train_accuracy and loss to Tensorboard
            writer.add_scalar('Training loss by steps', train_losses / batch, steps)
            writer.add_scalar('Training accuracy by steps', train_accuracy / batch, steps)          
    train_losses = 0.0
    train_accuracy = 0    
print('Train is finished...')

Epoch-1:
Epoch-2:
Epoch-3:
Epoch-4:
Epoch-5:
Epoch-6:
Epoch-7:
Epoch-8:
Epoch-9:
Epoch-10:
Epoch-11:
Epoch-12:
Epoch-13:
Epoch-14:
Epoch-15:
Epoch-16:
Epoch-17:
Epoch-18:
Epoch-19:
Epoch-20:
Epoch-21:
Epoch-22:
Epoch-23:
Epoch-24:
Epoch-25:
Epoch-26:
Epoch-27:
Epoch-28:
Epoch-29:
Epoch-30:
Epoch-31:
Epoch-32:
Epoch-33:
Epoch-34:
Epoch-35:
Epoch-36:
Epoch-37:
Epoch-38:
Epoch-39:
Epoch-40:
Epoch-41:
Epoch-42:
Epoch-43:
Epoch-44:
Epoch-45:
Epoch-46:
Epoch-47:
Epoch-48:
Epoch-49:
Epoch-50:
Train is finished...


In [None]:
# test step...
from torch.utils.tensorboard import SummaryWriter
test_losses = 0.0
test_accuracy = 0
epochs = 50
for epoch in range(epochs):  # loop over the dataset multiple times
    print('Epoch-{0}:'.format(epoch + 1, optimizer.param_groups[0]['lr']))
    for i, data in enumerate(testloader, 0):
        inputs, labels = data # get the inputs; data is a list of [inputs, labels]
        inputs, labels = inputs.cuda(), labels.cuda() # for using data in GPU
        optimizer.zero_grad() # zero the parameter gradients      
        outputs = model(inputs) # forward
        loss = criterion(outputs, labels) # calculate loss
        loss.backward() # backward loss
        optimizer.step() # optimize gradients
        test_losses += loss.item() # save loss
        _, preds = torch.max(outputs, 1) # save prediction
        test_accuracy += torch.sum(preds == labels.data) # save test_accuracy        
        if i % 1000 == 999:    # every 1000 mini-batches...           
            steps = epoch * len(testloader) + i # calculate steps 
            batch = i*batch_size # calculate batch 
            print("Test loss {:.5} Test Accuracy {:.5} Steps: {}".format(test_losses / batch, test_accuracy/batch, steps))           
            # Save test_accuracy and loss to Tensorboard
            writer.add_scalar('Test loss by steps', test_losses / batch, steps)
            writer.add_scalar('Test accuracy by steps', test_accuracy / batch, steps)                      
    print("Test Accuracy: {}/{} ({:.5} %) Test Loss: {:.5}".format(test_accuracy, len(testloader), 100. * test_accuracy / len(testloader.dataset), test_losses / len(testloader.dataset)))   
    test_losses = 0.0
    test_accuracy = 0    
print('Test is Finished...')

Epoch-1:
Test Accuracy: 853/125 (85.3 %) Test Loss: 0.050305
Epoch-2:
Test Accuracy: 915/125 (91.5 %) Test Loss: 0.028976
Epoch-3:
Test Accuracy: 937/125 (93.7 %) Test Loss: 0.021938
Epoch-4:
Test Accuracy: 953/125 (95.3 %) Test Loss: 0.018908
Epoch-5:
Test Accuracy: 951/125 (95.1 %) Test Loss: 0.017191
Epoch-6:
Test Accuracy: 963/125 (96.3 %) Test Loss: 0.012348
Epoch-7:
Test Accuracy: 953/125 (95.3 %) Test Loss: 0.015229
Epoch-8:
Test Accuracy: 970/125 (97.0 %) Test Loss: 0.012352
Epoch-9:
Test Accuracy: 974/125 (97.4 %) Test Loss: 0.010989
Epoch-10:
Test Accuracy: 968/125 (96.8 %) Test Loss: 0.012577
Epoch-11:
Test Accuracy: 970/125 (97.0 %) Test Loss: 0.011028
Epoch-12:
Test Accuracy: 978/125 (97.8 %) Test Loss: 0.0087883
Epoch-13:
Test Accuracy: 979/125 (97.9 %) Test Loss: 0.0081127
Epoch-14:
Test Accuracy: 983/125 (98.3 %) Test Loss: 0.0062273
Epoch-15:
Test Accuracy: 975/125 (97.5 %) Test Loss: 0.0084851
Epoch-16:
Test Accuracy: 973/125 (97.3 %) Test Loss: 0.010382
Epoch-17:
Tes