In [15]:
# Makes the display take up more of the screen
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [1]:
# export
import torch.nn as nn
import re
import timm 
from  torch import randn

In [2]:
timm.__version__

'0.4.12'

In [3]:

arch='tf_mobilenetv3_small_075'#'efficientnet_b3a'
input_channels = 4
num_outputs = 7

In [4]:
# export
def _is_pool_type(l): return re.search(r'Pool[123]d$', l.__class__.__name__)

def has_pool_type(m):
    "Return `True` if `m` is a pooling layer or has one in its children"
    if _is_pool_type(m): return True
    for l in m.children():
        if has_pool_type(l): return True
    return False

In [5]:
# export
def _get_first_layer(m):
    "Access first layer of a model"
    c,p,n = m,None,None  # child, parent, name
    for n in next(m.named_parameters())[0].split('.')[:-1]:
        p,c=c,getattr(c,n)
    return c,p,n

In [6]:
# export
def _update_first_layer(model, n_in, pretrained):
    "Change first layer based on number of input channels"
    if n_in == 3: return
    first_layer, parent, name = _get_first_layer(model)
    assert isinstance(first_layer, nn.Conv2d), f'Change of input channels only supported with Conv2d, found {first_layer.__class__.__name__}'
    assert getattr(first_layer, 'in_channels') == 3, f'Unexpected number of input channels, found {getattr(first_layer, "in_channels")} while expecting 3'
    params = {attr:getattr(first_layer, attr) for attr in 'out_channels kernel_size stride padding dilation groups padding_mode'.split()}
    params['bias'] = getattr(first_layer, 'bias') is not None
    params['in_channels'] = n_in
    new_layer = nn.Conv2d(**params)
    if pretrained:
        _load_pretrained_weights(new_layer, first_layer)
    setattr(parent, name, new_layer)

In [7]:
# export
def _load_pretrained_weights(new_layer, previous_layer):
    "Load pretrained weights based on number of input channels"
    n_in = getattr(new_layer, 'in_channels')
    if n_in==1:
        # we take the sum
        new_layer.weight.data = previous_layer.weight.data.sum(dim=1, keepdim=True)
    elif n_in==2:
        # we take first 2 channels + 50%
        new_layer.weight.data = previous_layer.weight.data[:,:2] * 1.5
    else:
        # keep 3 channels weights and set others to null
        new_layer.weight.data[:,:3] = previous_layer.weight.data
        new_layer.weight.data[:,3:].zero_()

In [8]:
# Export
def freezeCNNLayers(model):
    "This leaves the head with gradients but sets the layers before the AdaptiveAvgPool2d to not update"
    # there is probably a more elequent way to do this
    for child in model.children():
        try: #'AdaptiveAvgPool2d' not subscriptable if pooling changes this may not work
            _ = child[0]
            for param in child.parameters(): param.requires_grad = False
        except: pass #    

In [9]:
# export
def create_timm_body(arch:str, pretrained=True, cut=None, n_in=3):
    "Creates a body from any model in the `timm` library."
    model = timm.create_model(arch, pretrained=pretrained, num_classes=0, global_pool='')
    _update_first_layer(model, n_in, pretrained)
    if cut is None:
        ll = list(enumerate(model.children()))
        cut = next(i for i,o in reversed(ll) if has_pool_type(o))
    if isinstance(cut, int): return nn.Sequential(*list(model.children())[:cut])
    elif callable(cut): return cut(model)
    else: raise NamedError("cut must be either integer or function")

In [10]:

body = create_timm_body(arch, pretrained=True, cut=None, n_in=input_channels)


x = randn(1, input_channels, 224, 224);  #expected image size (mobileNet requires 224x224)


In [11]:
x.shape

torch.Size([1, 4, 224, 224])

In [12]:
body(x).shape

torch.Size([1, 432, 7, 7])

In [13]:
# export
def ModelMaker(arch='tf_mobilenetv3_small_075', input_channels = 4, num_outputs = 7, dropout=0.2):
    "Creates a custom pretrained CNN"
    body = create_timm_body(arch=arch, pretrained=True, cut=None, n_in=input_channels)
    x = randn(1, input_channels, 224, 224);  #expected image size (mobileNet requires 224x224)    
    num_in_features=body(x).shape[1]
    
    model=nn.Sequential(body,
        nn.AdaptiveAvgPool2d(1),       
        nn.Flatten(),
        nn.BatchNorm1d(num_in_features), # nn.LayerNorm may be better see: lesson 31 in Actor/Critic Phil Tabor course
        nn.Linear(in_features=num_in_features,out_features=512, bias=False), 
        nn.ReLU(),
        nn.BatchNorm1d(512),
        nn.Dropout(dropout),
        nn.Linear(in_features=512, out_features=num_outputs, bias=False)
    )
    #freezeCNNLayers(model)
    
    return model

In [14]:
model=ModelMaker(arch='tf_mobilenetv3_small_075', input_channels = 4, num_outputs = 7)

In [15]:
#for child in model.children():
        
 #   for param in child.parameters(): 
  #      print(param.requires_grad) 

In [16]:
freezeCNNLayers(model)

In [17]:
#model

In [18]:
x.shape

torch.Size([1, 4, 224, 224])

In [19]:
# test to ensure model produces results
model.eval()
actions=model(x)
model.train()
actions

tensor([[ 0.1573, -0.1663,  0.2694, -0.1124, -0.3671,  0.1126, -0.1601]],
       grad_fn=<MmBackward>)

In [20]:
x = randn(1, input_channels, 480, 480)
model.eval()
actions=model(x)
model.train()
actions

tensor([[ 0.2824, -0.1124,  0.2885, -0.1344, -0.4955,  0.0785, -0.1877]],
       grad_fn=<MmBackward>)

In [23]:
!python notebook2script.py NeuralNetwork.ipynb

Converted NeuralNetwork.ipynb to nbdev/nb_NeuralNetwork.py


In [24]:
import datetime as dt
end=dt.datetime.now()
print(f'Finished: {end.strftime("%A %B %d, %Y")} at {end.strftime("%H:%M")}')

Finished: Sunday February 20, 2022 at 09:22
