# Implementation of EfficentNet in Pytorch

### EfficentNet Neural Network

In [1]:
    #Set the device to run
# device= (
#         "cuda:1"
#         if torch.cuda.is_available()
#         else "mps"
#         if torch.backends.mps.is_available()
#         else "cpu"
#     )
# print(f"Using {device} device")
device = "cuda:0"

In [2]:
# imports
import torch
import os
from torch import nn
from mycode.GenderClassificationNN import GenderClassificationNN
from mycode.train_step import train_step
from mycode.test_step import test_step
from mycode.GenderDataset import GenderDataset
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
from typing import Tuple, Dict, List
from torchvision import transforms
from torchvision import datasets
from tqdm.auto import tqdm
from timeit import default_timer as timer 
import matplotlib.pyplot as plt
import clearml

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math

class Swish(nn.Module):
    def __init__(self):
        super(Swish, self).__init__()

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        return x * self.sigmoid(x)

def _RoundChannels(c, divisor=8, min_value=None):
    if min_value is None:
        min_value = divisor
    new_c = max(min_value, int(c + divisor / 2) // divisor * divisor)
    if new_c < 0.9 * c:
        new_c += divisor
    return new_c

def _RoundRepeats(r):
    return int(math.ceil(r))

def _DropPath(x, drop_prob, training):
    if drop_prob > 0 and training:
        keep_prob = 1 - drop_prob
        if x.is_cuda:
            mask = Variable(torch.cuda.FloatTensor(x.size(0), 1, 1, 1).bernoulli_(keep_prob))
        else:
            mask = Variable(torch.FloatTensor(x.size(0), 1, 1, 1).bernoulli_(keep_prob))
        x.div_(keep_prob)
        x.mul_(mask)

    return x

def _BatchNorm(channels, eps=1e-3, momentum=0.01):
    return nn.BatchNorm2d(channels, eps=eps, momentum=momentum)

def _Conv3x3Bn(in_channels, out_channels, stride):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False),
        _BatchNorm(out_channels),
        Swish()
    )

def _Conv1x1Bn(in_channels, out_channels):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, 1, 1, 0, bias=False),
        _BatchNorm(out_channels),
        Swish()
    )

class SqueezeAndExcite(nn.Module):
    def __init__(self, channels, squeeze_channels, se_ratio):
        super(SqueezeAndExcite, self).__init__()

        squeeze_channels = squeeze_channels * se_ratio
        if not squeeze_channels.is_integer():
            raise ValueError('channels must be divisible by 1/ratio')

        squeeze_channels = int(squeeze_channels)
        self.se_reduce = nn.Conv2d(channels, squeeze_channels, 1, 1, 0, bias=True)
        self.non_linear1 = Swish()
        self.se_expand = nn.Conv2d(squeeze_channels, channels, 1, 1, 0, bias=True)
        self.non_linear2 = nn.Sigmoid()

    def forward(self, x):
        y = torch.mean(x, (2, 3), keepdim=True)
        y = self.non_linear1(self.se_reduce(y))
        y = self.non_linear2(self.se_expand(y))
        y = x * y

        return y

class MBConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, expand_ratio, se_ratio, drop_path_rate):
        super(MBConvBlock, self).__init__()

        expand = (expand_ratio != 1)
        expand_channels = in_channels * expand_ratio
        se = (se_ratio != 0.0)
        self.residual_connection = (stride == 1 and in_channels == out_channels)
        self.drop_path_rate = drop_path_rate

        conv = []

        if expand:
            # expansion phase
            pw_expansion = nn.Sequential(
                nn.Conv2d(in_channels, expand_channels, 1, 1, 0, bias=False),
                _BatchNorm(expand_channels),
                Swish()
            )
            conv.append(pw_expansion)

        # depthwise convolution phase
        dw = nn.Sequential(
            nn.Conv2d(
                expand_channels,
                expand_channels,
                kernel_size,
                stride,
                kernel_size//2,
                groups=expand_channels,
                bias=False
            ),
            _BatchNorm(expand_channels),
            Swish()
        )
        conv.append(dw)

        if se:
            # squeeze and excite
            squeeze_excite = SqueezeAndExcite(expand_channels, in_channels, se_ratio)
            conv.append(squeeze_excite)

        # projection phase
        pw_projection = nn.Sequential(
            nn.Conv2d(expand_channels, out_channels, 1, 1, 0, bias=False),
            _BatchNorm(out_channels)
        )
        conv.append(pw_projection)

        self.conv = nn.Sequential(*conv)

    def forward(self, x):
        if self.residual_connection:
            return x + _DropPath(self.conv(x), self.drop_path_rate, self.training)
        else:
            return self.conv(x)

class EfficientNet(nn.Module):
    config = [
        #(in_channels, out_channels, kernel_size, stride, expand_ratio, se_ratio, repeats)
        [32,  16,  3, 1, 1, 0.25, 1],
        [16,  24,  3, 2, 6, 0.25, 2],
        [24,  40,  5, 2, 6, 0.25, 2],
        [40,  80,  3, 2, 6, 0.25, 3],
        [80,  112, 5, 1, 6, 0.25, 3],
        [112, 192, 5, 2, 6, 0.25, 4],
        [192, 320, 3, 1, 6, 0.25, 1]
    ]

    def __init__(self, param, num_classes=2, stem_channels=32, feature_size=1280, drop_connect_rate=0.2):
        super(EfficientNet, self).__init__()

        # scaling width
        width_coefficient = param[0]
        if width_coefficient != 1.0:
            stem_channels = _RoundChannels(stem_channels*width_coefficient)
            for conf in self.config:
                conf[0] = _RoundChannels(conf[0]*width_coefficient)
                conf[1] = _RoundChannels(conf[1]*width_coefficient)

        # scaling depth
        depth_coefficient = param[1]
        if depth_coefficient != 1.0:
            for conf in self.config:
                conf[6] = _RoundRepeats(conf[6]*depth_coefficient)

        # scaling resolution
        input_size = param[2]

        # stem convolution
        self.stem_conv = _Conv3x3Bn(3, stem_channels, 2)

        # total #blocks
        total_blocks = 0
        for conf in self.config:
            total_blocks += conf[6]

        # mobile inverted bottleneck
        blocks = []
        for in_channels, out_channels, kernel_size, stride, expand_ratio, se_ratio, repeats in self.config:
            # drop connect rate based on block index
            drop_rate = drop_connect_rate * (len(blocks) / total_blocks)
            blocks.append(MBConvBlock(in_channels, out_channels, kernel_size, stride, expand_ratio, se_ratio, drop_rate))
            for _ in range(repeats-1):
                drop_rate = drop_connect_rate * (len(blocks) / total_blocks)
                blocks.append(MBConvBlock(out_channels, out_channels, kernel_size, 1, expand_ratio, se_ratio, drop_rate))
        self.blocks = nn.Sequential(*blocks)

        # last several layers
        self.head_conv = _Conv1x1Bn(self.config[-1][1], feature_size)
        #self.avgpool = nn.AvgPool2d(input_size//32, stride=1)
        self.dropout = nn.Dropout(param[3])
        self.classifier = nn.Linear(feature_size, num_classes)

        self._initialize_weights()

    def forward(self, x):
        x = self.stem_conv(x)
        x = self.blocks(x)
        x = self.head_conv(x)
        #x = self.avgpool(x)
        #x = x.view(x.size(0), -1)
        x = torch.mean(x, (2, 3))
        x = self.dropout(x)
        x = self.classifier(x)

        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2.0 / n))
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                n = m.weight.size(1)
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()


In [4]:
    # if __name__ == '__main__':
    #     net_param = {
    #         # 'efficientnet type': (width_coef, depth_coef, resolution, dropout_rate)
    #         'efficientnet-b0': (1.0, 1.0, 224, 0.2),
    #         'efficientnet-b1': (1.0, 1.1, 240, 0.2),
    #         'efficientnet-b2': (1.1, 1.2, 260, 0.3),
    #         'efficientnet-b3': (1.2, 1.4, 300, 0.3),
    #         'efficientnet-b4': (1.4, 1.8, 380, 0.4),
    #         'efficientnet-b5': (1.6, 2.2, 456, 0.4),
    #         'efficientnet-b6': (1.8, 2.6, 528, 0.5),
    #         'efficientnet-b7': (2.0, 3.1, 600, 0.5)
    #     }

    #     param = net_param['efficientnet-b0']
    #     net = EfficientNet(param)
    #     x_image = Variable(torch.randn(1, 3, param[2], param[2]))
    #     y = net(x_image)

### Dataset, Transforms, and Dataloaders

In [5]:
#path
train_dir = "dataset_all/train"
test_dir  = "dataset_all/test"
# Augment train data
train_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        #--NEW --NOT TESTED YET
        transforms.ColorJitter(brightness=0.5),
        transforms.RandomRotation(45),
        transforms.RandomVerticalFlip(p=0.05),
        #--NEW
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.ToTensor()
    ])
    # Don't augment test data, only resize the images
test_transforms = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])
# Using custom dataset class
train_data = GenderDataset(targ_dir=train_dir,transform=train_transforms)
test_data = GenderDataset(targ_dir=test_dir,transform=test_transforms)

BATCH_SIZE = 256
    #Dataloader
train_dataloader = DataLoader(dataset=train_data, 
                            batch_size=BATCH_SIZE, 
                            num_workers=0, 
                            shuffle=True) 

test_dataloader = DataLoader(dataset=test_data, 
                            batch_size=BATCH_SIZE, 
                            num_workers=2, 
                            shuffle=False) 

In [6]:
def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values, e.g.
            {"train_loss": [...],
            "train_acc": [...],
            "test_loss": [...],
            "test_acc": [...]}
    """
    
    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot 
    plt.figure(figsize=(15, 7))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()
    plt.savefig('models/results.png')

In [7]:
def train(model: torch.nn.Module, 
        train_dataloader: torch.utils.data.DataLoader, 
        test_dataloader: torch.utils.data.DataLoader, 
        optimizer: torch.optim.Optimizer,
        loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
        epochs: int = 5):
    
    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs), leave=True):
        train_loss, train_acc = train_step(model=model,
                                        dataloader=train_dataloader,
                                        loss_fn=loss_fn,
                                        optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)
        
        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}\n"
        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

In [8]:
net_param = {
            # 'efficientnet type': (width_coef, depth_coef, resolution, dropout_rate)
            'efficientnet-b0': (1.0, 1.0, 224, 0.2),
            'efficientnet-b1': (1.0, 1.1, 240, 0.2),
            'efficientnet-b2': (1.1, 1.2, 260, 0.3),
            'efficientnet-b3': (1.2, 1.4, 300, 0.3),
            'efficientnet-b4': (1.4, 1.8, 380, 0.4),
            'efficientnet-b5': (1.6, 2.2, 456, 0.4),
            'efficientnet-b6': (1.8, 2.6, 528, 0.5),
            'efficientnet-b7': (2.0, 3.1, 600, 0.5)
        }

param = net_param['efficientnet-b0']
net = EfficientNet(param)
x_image = Variable(torch.randn(1, 3, param[2], param[2]))
y = net(x_image)

In [9]:
#define the model
# model = GenderClassificationNN().to(device)
param = net_param['efficientnet-b0']
model = EfficientNet(param).to(device)
# loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

#training and testing
# Set random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

In [11]:
# Set number of epochs
NUM_EPOCHS = 15

start_time = timer()

model_results = train(model=model, 
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn, 
                        epochs=NUM_EPOCHS
                    )
end_time = timer()
print(f"\nTotal training time: {end_time-start_time:.3f} seconds")

# plot the results 
plot_loss_curves(model_results)
    
#save the image
torch.save(model.state_dict(), "models/model.pth")  #Saving models
print("Saved PyTorch Model State to models/model.pth")

  0%|          | 0/15 [00:00<?, ?it/s]

27.18GB batch 184/184: 100%|██████████| 184/184 [02:18<00:00,  1.33it/s]


Epoch: 1 | train_loss: 0.1723 | train_acc: 0.9339 | test_loss: 0.3230 | test_acc: 0.8925



27.18GB batch 184/184: 100%|██████████| 184/184 [02:14<00:00,  1.37it/s]


Epoch: 2 | train_loss: 0.1468 | train_acc: 0.9467 | test_loss: 0.1075 | test_acc: 0.9626



27.18GB batch 184/184: 100%|██████████| 184/184 [02:14<00:00,  1.37it/s]


Epoch: 3 | train_loss: 0.1301 | train_acc: 0.9520 | test_loss: 0.1010 | test_acc: 0.9649



27.18GB batch 184/184: 100%|██████████| 184/184 [02:13<00:00,  1.37it/s]


Epoch: 4 | train_loss: 0.1259 | train_acc: 0.9551 | test_loss: 0.1956 | test_acc: 0.9386



27.18GB batch 184/184: 100%|██████████| 184/184 [02:13<00:00,  1.38it/s]


Epoch: 5 | train_loss: 0.1191 | train_acc: 0.9567 | test_loss: 0.0874 | test_acc: 0.9687



27.18GB batch 184/184: 100%|██████████| 184/184 [02:14<00:00,  1.36it/s]


Epoch: 6 | train_loss: 0.1154 | train_acc: 0.9595 | test_loss: 0.1296 | test_acc: 0.9556





In [15]:
torch.cuda.set_device(0)
torch.cuda.empty_cache()