In [1]:
import math
import torch
from torch import nn

# Basic Layers

In [2]:
class ConvBnAct(nn.Module):
  """Layer grouping a convolution, batchnorm, and activation function"""
  def __init__(self, n_in, n_out, kernel_size=3, 
               stride=1, padding=0, groups=1, bias=False,
               bn=True, act=True):
    super().__init__()
    
    self.conv = nn.Conv2d(n_in, n_out, kernel_size=kernel_size,
                          stride=stride, padding=padding,
                          groups=groups, bias=bias)
    self.bn = nn.BatchNorm2d(n_out) if bn else nn.Identity()
    self.act = nn.SiLU() if act else nn.Identity()
  
  def forward(self, x):
    x = self.conv(x)
    x = self.bn(x)
    x = self.act(x)
    return x

In [3]:
class SEBlock(nn.Module):
  """Squeeze-and-excitation block"""
  def __init__(self, n_in, r=24):
    super().__init__()

    self.squeeze = nn.AdaptiveAvgPool2d(1)
    self.excitation = nn.Sequential(nn.Conv2d(n_in, n_in//r, kernel_size=1),
                                    nn.SiLU(),
                                    nn.Conv2d(n_in//r, n_in, kernel_size=1),
                                    nn.Sigmoid())
  
  def forward(self, x):
    y = self.squeeze(x)
    y = self.excitation(y)
    return x * y

In [4]:
class DropSample(nn.Module):
  """Drops each sample in x with probability p during training"""
  def __init__(self, p=0):
    super().__init__()

    self.p = p
  
  def forward(self, x):
    if (not self.p) or (not self.training):
      return x
    
    batch_size = len(x)
    random_tensor = torch.cuda.FloatTensor(batch_size, 1, 1, 1).uniform_()
    bit_mask = self.p<random_tensor

    x = x.div(1-self.p)
    x = x * bit_mask
    return x

In [5]:
class MBConvN(nn.Module):
  """MBConv with an expansion factor of N, plus squeeze-and-excitation"""
  def __init__(self, n_in, n_out, expansion_factor,
               kernel_size=3, stride=1, r=24, p=0):
    super().__init__()

    padding = (kernel_size-1)//2
    expanded = expansion_factor*n_in
    self.skip_connection = (n_in == n_out) and (stride == 1)

    self.expand_pw = nn.Identity() if (expansion_factor == 1) else ConvBnAct(n_in, expanded, kernel_size=1)
    self.depthwise = ConvBnAct(expanded, expanded, kernel_size=kernel_size, 
                               stride=stride, padding=padding, groups=expanded)
    self.se = SEBlock(expanded, r=r)
    self.reduce_pw = ConvBnAct(expanded, n_out, kernel_size=1,
                               act=False)
    self.dropsample = DropSample(p)
  
  def forward(self, x):
    residual = x

    x = self.expand_pw(x)
    x = self.depthwise(x)
    x = self.se(x)
    x = self.reduce_pw(x)

    if self.skip_connection:
      x = self.dropsample(x)
      x = x + residual

    return x

In [6]:
class MBConv1(MBConvN):
  def __init__(self, n_in, n_out, kernel_size=3,
               stride=1, r=24, p=0):
    super().__init__(n_in, n_out, expansion_factor=1,
                     kernel_size=kernel_size, stride=stride,
                     r=r, p=p)

In [7]:
class MBConv6(MBConvN):
  def __init__(self, n_in, n_out, kernel_size=3,
               stride=1, r=24, p=0):
    super().__init__(n_in, n_out, expansion_factor=6,
                     kernel_size=kernel_size, stride=stride,
                     r=r, p=p)

# Scaling Functions

In [8]:
def create_stage(n_in, n_out, num_layers, layer_type=MBConv6, 
                 kernel_size=3, stride=1, r=24, p=0):
  """Creates a Sequential consisting of [num_layers] layer_type"""
  layers = [layer_type(n_in, n_out, kernel_size=kernel_size,
                       stride=stride, r=r, p=p)]
  layers += [layer_type(n_out, n_out, kernel_size=kernel_size,
                        r=r, p=p) for _ in range(num_layers-1)]
  layers = nn.Sequential(*layers)
  return layers

In [9]:
def scale_width(w, w_factor):
  """Scales width given a scale factor"""
  w *= w_factor
  new_w = (int(w+4) // 8) * 8
  new_w = max(8, new_w)
  if new_w < 0.9*w:
     new_w += 8
  return int(new_w)

# EfficientNet

In [10]:
class EfficientNet(nn.Module):
  """Generic EfficientNet that takes in the width and depth scale factors and scales accordingly"""
  def __init__(self, w_factor=1, d_factor=1,
               out_sz=1000):
    super().__init__()

    base_widths = [(32, 16), (16, 24), (24, 40),
                   (40, 80), (80, 112), (112, 192),
                   (192, 320), (320, 1280)]
    base_depths = [1, 2, 2, 3, 3, 4, 1]

    scaled_widths = [(scale_width(w[0], w_factor), scale_width(w[1], w_factor)) 
                     for w in base_widths]
    scaled_depths = [math.ceil(d_factor*d) for d in base_depths]
    
    kernel_sizes = [3, 3, 5, 3, 5, 5, 3]
    strides = [1, 2, 2, 2, 1, 2, 1]
    ps = [0, 0.029, 0.057, 0.086, 0.114, 0.143, 0.171]

    self.stem = ConvBnAct(3, scaled_widths[0][0], stride=2, padding=1)
    
    stages = []
    for i in range(7):
      layer_type = MBConv1 if (i == 0) else MBConv6
      r = 4 if (i == 0) else 24
      stage = create_stage(*scaled_widths[i], scaled_depths[i],
                           layer_type, kernel_size=kernel_sizes[i], 
                           stride=strides[i], r=r, p=ps[i])
      stages.append(stage)
    self.stages = nn.Sequential(*stages)

    self.pre_head = ConvBnAct(*scaled_widths[-1], kernel_size=1)

    self.head = nn.Sequential(nn.AdaptiveAvgPool2d(1),
                              nn.Flatten(),
                              nn.Linear(scaled_widths[-1][1], out_sz))

  def feature_extractor(self, x):
    x = self.stem(x)
    x = self.stages(x)
    x = self.pre_head(x)
    return x

  def forward(self, x):
    x = self.feature_extractor(x)
    x = self.head(x)
    return x

In [11]:
class EfficientNetB0(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1
    d_factor = 1
    super().__init__(w_factor, d_factor, out_sz)

In [12]:
class EfficientNetB1(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1
    d_factor = 1.1
    super().__init__(w_factor, d_factor, out_sz)

In [13]:
class EfficientNetB2(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1.1
    d_factor = 1.2
    super().__init__(w_factor, d_factor, out_sz)

In [14]:
class EfficientNetB3(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1.2
    d_factor = 1.4
    super().__init__(w_factor, d_factor, out_sz)

In [15]:
class EfficientNetB4(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1.4
    d_factor = 1.8
    super().__init__(w_factor, d_factor, out_sz)

In [16]:
class EfficientNetB5(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1.6
    d_factor = 2.2
    super().__init__(w_factor, d_factor, out_sz)

In [17]:
class EfficientNetB6(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 1.8
    d_factor = 2.6
    super().__init__(w_factor, d_factor, out_sz)

In [18]:
class EfficientNetB7(EfficientNet):
  def __init__(self, out_sz=1000):
    w_factor = 2
    d_factor = 3.1
    super().__init__(w_factor, d_factor, out_sz)

In [19]:
import os
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import time

# Thread count, will use half of it to load the data
THREADS = 4
ROOT = '../data/imagenette2'

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
batch_size = 4
classes = [_ for _ in os.listdir(ROOT + '/train')]
net = EfficientNetB7(out_sz=len(classes))




In [20]:
def imshow(img):
    img = img / 2 + 0.5
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


def train(trainloader, testloader, epochs=2):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
    # optimizer = optim.Adam(net.parameters(), lr=0.001)

    start_time = time.perf_counter()
    print('Training start:', time.asctime(time.localtime()))
    for epoch in range(epochs):
        torch.cuda.empty_cache()
        for i, data in enumerate(trainloader, 0):
            # get the inputs
            inputs, labels = data[0].to(device), data[1].to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        print('Epoch:', epoch)
        torch.save(net.state_dict(), PATH)

    print('Finished Training:', time.asctime(time.localtime()))
    end_time = time.perf_counter()
    print('Training time:', end_time - start_time)


def test(testloader):
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}
    all_correct = 0
    all_preds = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = net(images)
            _, predictions = torch.max(outputs.data, 1)
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[classes[label]] += 1
                    all_correct += 1
                total_pred[classes[label]] += 1
                all_preds += 1

    print(f"General accuracy for this classifier: {(100 * float(all_correct) / float(all_preds)):.1f}%")
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f"Accuracy for class: {classname} is {accuracy:.1f}%")


def main():
    transform = transforms.Compose([
        transforms.RandomResizedCrop(128),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    trainset = torchvision.datasets.ImageFolder(root=ROOT + '/train', transform=transform)
    _trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=THREADS//2)

    testset = torchvision.datasets.ImageFolder(root=ROOT + '/test', transform=transform)
    _testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=THREADS//2)

    return _trainloader, _testloader


def train_and_save(trainloader, testloader, PATH, epochs=2):
    # Training
    train(trainloader, testloader, epochs)
    # Saving the model
    torch.save(net.state_dict(), PATH)


In [22]:
print('Program starting:', time.asctime(time.localtime()))
net.to(device)
trainloader, testloader = main()

# Saving the model
PATH = '../models/efficient_net_tut.pth'

train_and_save(trainloader, testloader, PATH, epochs=5)

# Loading the model
net.load_state_dict(torch.load(PATH))

# Test over 10000 images
test(testloader)

Program starting: Mon Apr 11 02:59:35 2022
Training start: Mon Apr 11 02:59:35 2022
Epoch: 0
Epoch: 1
Epoch: 2
Epoch: 3
Epoch: 4
Finished Training: Mon Apr 11 03:35:30 2022
Training time: 2155.0323498
General accuracy for this classifier: 17.3%
Accuracy for class: n01440764 is 9.8%
Accuracy for class: n02102040 is 5.3%
Accuracy for class: n02979186 is 22.1%
Accuracy for class: n03000684 is 3.4%
Accuracy for class: n03028079 is 25.2%
Accuracy for class: n03394916 is 19.8%
Accuracy for class: n03417042 is 26.2%
Accuracy for class: n03425413 is 12.4%
Accuracy for class: n03445777 is 15.0%
Accuracy for class: n03888257 is 34.1%
