In [3]:
# temporary solution to crashing
#import os
#os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
#import torchvision.transforms as transforms
from torch import Tensor
from torchvision.ops import Conv2dNormActivation

import matplotlib.pyplot as plt
import numpy as np
#import mysql.connector as connector

from pathlib import Path
import logging

In [5]:
def _make_divisible(v, divisor, min_value=None): # rounds to nearest multiple of divisor
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

In [6]:
#from ..ops.misc import Conv2dNormActivation
#from ..transforms._presets import ImageClassification
#from ..utils import _log_api_usage_once
#from ._api import register_model, Weights, WeightsEnum
#from ._meta import _IMAGENET_CATEGORIES
#from ._utils import _make_divisible
#, _ovewrite_named_param, handle_legacy_interface

from functools import partial
from typing import Any, Callable, Optional
__all__ = ["MobileNetV2", "MobileNet_V2_Weights", "mobilenet_v2"]

# necessary for backwards compatibility
class InvertedResidual(nn.Module):
    def __init__(self, inp, oup, stride, expand_ratio, norm_layer):
        super().__init__()

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d # initialize batch normalization layer

        self.stride = stride # should be 1 or 2
        hidden_dim = int(round(inp * expand_ratio)) # adjusts number of filters
        self.use_res_connect = self.stride == 1 and inp == oup

        layers = [] # creat empty list
        if expand_ratio != 1: # if were expanding
            layers.append(Conv2dNormActivation(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6)) 

        layers.extend([
                Conv2dNormActivation(hidden_dim,hidden_dim,kernel_size=3,stride=stride,groups=hidden_dim,
                                     norm_layer=norm_layer,activation_layer=nn.ReLU6), # depthwise convolution
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False), # linear 1x1 convolution
                norm_layer(oup)]) # normalize output
        
        self.conv = nn.Sequential(*layers) 
        self.out_channels = oup
        self._is_cn = stride > 1 # downsample indicator

    def forward(self, x):
        if self.use_res_connect:
            return x + self.conv(x) # add input to convolution output
        else:
            return self.conv(x) # only returns output


class MobileNetV2(nn.Module):
    def __init__(self, num_classes=1000, width_mult=1.0, inverted_residual_setting = None, round_nearest=8,
                  block = None, norm_layer = None, dropout = 0.2):
        """
        MobileNet V2 main class

        Args:
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
            block: Module specifying inverted residual building block for mobilenet
            norm_layer: Module specifying the normalization layer to use

        """
        super().__init__()
        #_log_api_usage_once(self)

        if block is None:
            block = InvertedResidual

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        input_channel = 32 # adjustable
        last_channel = 1280 # adjustable

        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t - expansion factor, c - # of output after block, n - # of repitions, s - stride
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        #if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
        #    raise ValueError(
        #        f"inverted_residual_setting should be non-empty or a 4-element list, got {inverted_residual_setting}"
        #    )

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)

        features = [
            Conv2dNormActivation(3, input_channel, kernel_size=3, stride=2, norm_layer=norm_layer, activation_layer=nn.ReLU6) # RGB to input_channel num filters/channels
        ]

        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest) # num of output for resid block 
            for i in range(n): # repeating the block
                stride = s if i == 0 else 1 # set stride
                features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer)) # adding residual blocks
                input_channel = output_channel # adjust input for next block

        # building last several layers
        features.append(Conv2dNormActivation(input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer, activation_layer=nn.ReLU6))

        # make it nn.Sequential
        self.features = nn.Sequential(*features) # full CNN architecture

        # building classifier
        self.classifier = nn.Sequential(
            nn.Dropout(p=dropout), # some inputs turned off
            nn.Linear(self.last_channel, num_classes), # classify (how?)
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out")
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    # Cannot use "squeeze" as batch-size can be 1
    # This exists since TorchScript doesn't support inheritance, so the superclass method
    # (this one) needs to have a name other than `forward` that can be accessed in a subclass
    def _forward_impl(self, x):
        x = self.features(x) # pass input through layers
        x = nn.functional.adaptive_avg_pool2d(x, (1, 1)) # takes the average value for each each channel
        x = torch.flatten(x, 1) # flatten for linear layer
        x = self.classifier(x) # classify
        return x

    def forward(self, x):
        return self._forward_impl(x)