In [1]:
"""
Creates a MobileNetV3 Model as defined in:
Andrew Howard, Mark Sandler, Grace Chu, Liang-Chieh Chen, Bo Chen, Mingxing Tan, Weijun Wang, Yukun Zhu, Ruoming Pang, Vijay Vasudevan, Quoc V. Le, Hartwig Adam. (2019).
Searching for MobileNetV3
arXiv preprint arXiv:1905.02244.
"""

import torch.nn as nn
import math


__all__ = ['mobilenetv3_large', 'mobilenetv3_small']


def _make_divisible(v, divisor, min_value=None):
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class h_sigmoid(nn.Module):
    def __init__(self, inplace=True):
        super(h_sigmoid, self).__init__()
        self.relu = nn.ReLU6(inplace=inplace)

    def forward(self, x):
        return self.relu(x + 3) / 6

class h_swish(nn.Module):
    def __init__(self, inplace=True):
        super(h_swish, self).__init__()
        self.sigmoid = h_sigmoid(inplace=inplace)

    def forward(self, x):
        return x * self.sigmoid(x)


class SELayer(nn.Module):
    def __init__(self, channel, reduction=4):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
                nn.Linear(channel, _make_divisible(channel // reduction, 8)),
                nn.ReLU(inplace=True),
                nn.Linear(_make_divisible(channel // reduction, 8), channel),
                h_sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y




In [2]:
def conv_3x3(inp, oup, stride, batch_norm = True):
    layers = [
        nn.Conv2d(inp, oup, kernel_size=3, stride=1,
                  padding=1, bias=False),
        h_swish()
    ]#
    if batch_norm:
        layers.insert(1, nn.BatchNorm2d(oup))

    return nn.Sequential(*layers)

def conv_1x1(inp, oup, batch_norm = True):
    layers = [
        nn.Conv2d(inp, oup, kernel_size=1, stride=1,
                  padding=0, bias=False),
        h_swish()
    ]
    if batch_norm:
        layers.insert(1, nn.BatchNorm2d(oup))

    return nn.Sequential(*layers)

In [3]:
def od_conv_1x1(inp, oup, stride = 1, kernel_num = 4,
                temperature = 60,
                batch_norm = True):
    return nn.Sequential(
        ODConvBN(inp, oup, kernel_size = 1, stride = stride,
            kernel_num = kernel_num, temperature = temperature) \
                if batch_norm == True else \
        ODConv2d(inp, oup, kernel_size = 1, stride = stride,
            kernel_num = kernel_num, temperature = temperature),

        h_swish()
    )

def od_conv_3x3(inp, oup, stride = 1,
                kernel_num = 4, temperature = 60,
                batch_norm = True):
    return nn.Sequential(
         ODConvBN(inp, oup, kernel_size = 3, stride = stride,
            kernel_num = kernel_num, temperature = temperature) \
                if batch_norm == True else \
        ODConv2d(inp, oup, kernel_size = 3, stride = stride,
            kernel_num = kernel_num, temperature = temperature),

        h_swish()
    )

In [4]:
class InvertedResidualOD(nn.Module):
    def __init__(self, inp, hidden_dim, oup, kernel_size,
                 stride, use_se, use_hs,
                 kernel_num = 4, temperature = 60.0):
        super(InvertedResidualOD, self).__init__()
        assert stride in [1, 2]

        self.identity = stride == 1 and inp == oup
        print("Using OmniDimensional")
        if inp == hidden_dim:
            self.conv = nn.Sequential(
                # dw
                ODConvBN(hidden_dim, hidden_dim, kernel_size, stride,
                         groups=hidden_dim, kernel_num = kernel_num,
                         temperature = temperature),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # Squeeze-and-Excite
                SELayer(hidden_dim) if use_se else nn.Identity(),
                # pw-linear
                ODConv2d(hidden_dim, oup, 1, 1, kernel_num = kernel_num,
                         temperature = temperature),
                nn.BatchNorm2d(oup),
            )
        else:
            self.conv = nn.Sequential(
                # pw
                ODConvBN(inp, hidden_dim, kernel_size = 1, stride = 1,
                         kernel_num = kernel_num, temperature = temperature),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # dw
                ODConvBN(hidden_dim, hidden_dim, kernel_size,
                         stride, groups=hidden_dim, kernel_num = kernel_num,
                         temperature = temperature),
                # Squeeze-and-Excite
                SELayer(hidden_dim) if use_se else nn.Identity(),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # pw-linear
                ODConv2d(hidden_dim, oup, 1, 1, kernel_num = kernel_num,
                         temperature = temperature),
                nn.BatchNorm2d(oup),
            )

    def forward(self, x):
        if self.identity:
            return x + self.conv(x)
        else:
            return self.conv(x)


class InvertedResidual(nn.Module):
    def __init__(self, inp, hidden_dim, oup, kernel_size, stride, use_se, use_hs):
        super(InvertedResidual, self).__init__()
        assert stride in [1, 2]

        self.identity = stride == 1 and inp == oup

        print("Using Normal")
        if inp == hidden_dim:
            self.conv = nn.Sequential(
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, kernel_size, stride, (kernel_size - 1) // 2, groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # Squeeze-and-Excite
                SELayer(hidden_dim) if use_se else nn.Identity(),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )
        else:
            self.conv = nn.Sequential(
                # pw
                nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
                nn.BatchNorm2d(hidden_dim),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, kernel_size, stride, (kernel_size - 1) // 2, groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                # Squeeze-and-Excite
                SELayer(hidden_dim) if use_se else nn.Identity(),
                h_swish() if use_hs else nn.ReLU(inplace=True),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )

    def forward(self, x):
        if self.identity:
            return x + self.conv(x)
        else:
            return self.conv(x)


In [5]:
class MobileNetV3(nn.Module):
    def __init__(self, cfgs, mode, kernel_num,
                 temperature, od_bottleneck = 0, 
                 od_outside = 0, num_classes = 10, width_mult=1.,
                 use_od = False, drop_rate = 0.2):
        super(MobileNetV3, self).__init__()
        # setting of inverted residual blocks
        self.cfgs = cfgs
        self.use_od = use_od
        assert mode in ['large', 'small']

        num_od = int(od_outside)
        od_bottleneck = int(od_bottleneck)
        
        self.num_od = num_od
        # building first layer
        input_channel = _make_divisible(16 * width_mult, 8)

        layers = []
        if num_od > 0:
            print("Using OD")
            layers.append(od_conv_3x3(3, input_channel, stride = 2,
                                      kernel_num = kernel_num,
                                      temperature = temperature))
        else:
            print("Using Normal")
            layers.append(conv_3x3(3, input_channel, stride = 2))
        
        # building inverted residual blocks
        

        i = 0
        for k, t, c, use_se, use_hs, s in self.cfgs:
            output_channel = _make_divisible(c * width_mult, 8)
            exp_size = _make_divisible(input_channel * t, 8)
            block = InvertedResidual if (use_od == False or i >= od_bottleneck) \
                        else InvertedResidualOD
            
            if use_od == False or i >= od_bottleneck:
                layers.append(block(input_channel, exp_size, output_channel,
                                    k, s, use_se, use_hs))
            else:
                layers.append(block(input_channel, exp_size, output_channel,
                                    k, s, use_se, use_hs, kernel_num = kernel_num,
                                    temperature = temperature))
                i += 1
            
            input_channel = output_channel
        self.features = nn.Sequential(*layers)

        # building last several layers
        if num_od >= 2:
            print("Using OD")
            self.conv = od_conv_1x1(input_channel, exp_size,
                                    kernel_num = kernel_num,
                                    temperature = temperature)
        else:
            print("Using Normal")
            self.conv = conv_1x1(input_channel, exp_size)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        output_channel = {'large': 1280, 'small': 1024}

        output_channel = _make_divisible(output_channel[mode] * width_mult, 8) if width_mult > 1.0 else output_channel[mode]

        omni_layers = []
        not_omni_layers = []
        temp_od = num_od - 2

        self.list_layers = []
        for i in range(2):
            if temp_od > 0:
                self.list_layers.append("OD")
            else:
                self.list_layers.append("Normal")

            temp_od -= 1


        input_channel = exp_size
        for layer in self.list_layers:
            if layer == "OD":
                print("Using OD")
                omni_layers.append(od_conv_1x1(input_channel, output_channel,
                                          kernel_num = kernel_num,
                                          temperature = temperature))
            else:
                print("Using Normal")
                not_omni_layers.append(nn.Linear(input_channel, output_channel))

            input_channel = output_channel
            output_channel = num_classes

        self.omni_layers = nn.Sequential(*omni_layers)
        self.normal_layers = nn.Sequential(*not_omni_layers)
        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = self.conv(x)
        x = self.avgpool(x)
        x = self.omni_layers(x)
        x = x.view(x.size(0), -1)
        x = self.normal_layers(x)

        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()

    def net_update_temperature(self, temperature):
        for modules in self.modules():
            if hasattr(modules, "update_temperature"):
                modules.update_temperature(temperature)

    def display_temperature(self):
        for modules in self.modules():
            if hasattr(modules, "get_temperature"):
                return modules.get_temperature()



In [6]:
def mobilenetv3_large(**kwargs):
    """
    Constructs a MobileNetV3-Large model
    """
    cfgs = [
        # k, t,   c,  SE, HS, s
        [3,   1,  16, 0, 0, 1],
        [3,   4,  24, 0, 0, 2],
        [3,   3,  24, 0, 0, 1],
        [5,   3,  40, 1, 0, 2],
        [5,   3,  40, 1, 0, 1],
        [5,   3,  40, 1, 0, 1],
        [3,   6,  80, 0, 1, 2],
        [3, 2.5,  80, 0, 1, 1],
        [3, 2.3,  80, 0, 1, 1],
        [3, 2.3,  80, 0, 1, 1],
        [3,   6, 112, 1, 1, 1],
        [3,   6, 112, 1, 1, 1],
        [5,   6, 160, 1, 1, 2],
        [5,   6, 160, 1, 1, 1],
        [5,   6, 160, 1, 1, 1]
    ]
    return MobileNetV3(cfgs, mode='large', **kwargs)


def mobilenetv3_small(**kwargs):
    """
    Constructs a MobileNetV3-Small model
    """
    cfgs = [
        # k,   t,  c, SE, HS, s
        [3,    1,  16, 1, 0, 2],
        [3,  4.5,  24, 0, 0, 2],
        [3, 3.67,  24, 0, 0, 1],
        [5,    4,  40, 1, 1, 2],
        [5,    6,  40, 1, 1, 1],
        [5,    6,  40, 1, 1, 1],
        [5,    3,  48, 1, 1, 1],
        [5,    3,  48, 1, 1, 1],
        [5,    6,  96, 1, 1, 2],
        [5,    6,  96, 1, 1, 1],
        [5,    6,  96, 1, 1, 1],
    ]

    return MobileNetV3(cfgs, mode='small', **kwargs)

In [7]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [8]:
import torch
import torch.nn as nn
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split
import numpy as np

In [None]:
def load_data(data_dir, download = True):

  transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
  ])

  train_data = datasets.CIFAR10(
      root = data_dir, train = True,
      download = download, transform = transform
  )

  test_data = datasets.CIFAR10(
      root = data_dir, train = False,
      download = download, transform = transform
  )

  return (train_data, test_data)

train_data, test_data = load_data('./data/cifar10')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar10/cifar-10-python.tar.gz


  4%|▍         | 7241728/170498071 [02:45<1:32:36, 29379.81it/s]

In [None]:
from argparse import Namespace


torch.manual_seed(42)
args = Namespace(
    data_dir = './data/cifar10',
    device = 'cuda' if torch.cuda.is_available() else 'cpu',
    batch_size = 128,
    num_workers = 2,
)

In [None]:
import logging
import os
from tqdm.notebook import tqdm

def check_logging_directory(path):
  parent_directory = os.path.dirname(path)
  if not os.path.exists(parent_directory):
    os.makedirs(parent_directory)
    print("Create new directory")

logging_path = './logging/bayesian_omni_cifar10.log'
check_logging_directory(logging_path)

logging.basicConfig(filename=logging_path, level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')