In [1]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F

# OneDCNN

In [2]:
from model import *

model = OneDCNN(n_channels=64, hidden=128, n_classes=17)
sum([p.numel() for p in model.parameters()])

153873

In [80]:
class OneDCNN(torch.nn.Module):
    def __init__(self, n_channels=16, hidden=16, n_classes=17, eps=2e-5, use_bias=False, dp_rate=0.25):
        super(OneDCNN, self).__init__()

        # Conv Layers
        self.c1 = torch.nn.Conv1d(in_channels=6,              out_channels=n_channels,      kernel_size=8,  stride=1, padding='same', dilation=1, bias=use_bias)
        self.c2 = torch.nn.Conv1d(in_channels=n_channels,     out_channels=n_channels * 2,  kernel_size=4,  stride=1, padding='same', dilation=1, bias=use_bias)
        self.c3 = torch.nn.Conv1d(in_channels=n_channels * 2, out_channels=n_channels * 4,  kernel_size=2,  stride=1, padding='same', dilation=1, bias=use_bias)
        
        # Fully connected
        self.f1 = torch.nn.Linear(in_features=n_channels * 4, out_features=hidden,    bias=True)
        self.f2 = torch.nn.Linear(in_features=hidden,         out_features=hidden,    bias=True)
        self.f3 = torch.nn.Linear(in_features=hidden,         out_features=n_classes, bias=True)

        # BatchNorm Layers
        self.bn1 = torch.nn.BatchNorm1d(num_features=n_channels,      eps=eps)
        self.bn2 = torch.nn.BatchNorm1d(num_features=n_channels * 2,  eps=eps)
        self.bn3 = torch.nn.BatchNorm1d(num_features=n_channels * 4,  eps=eps)

        # Pooling
        self.pool = torch.max_pool1d
        # self.pool = nn.MaxPool1d(kernel_size=3, stride=1, padding='same')

        # DropOut
        self.dropout = torch.nn.Dropout(p=dp_rate)

        # Initialization
        for m in self.modules():
            if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
                m.weight.data.normal_(0, torch.prod(torch.Tensor(list(m.weight.shape)[1:]))**(-1/2))
                if use_bias: m.bias.data.fill_(0.0)

    def forward(self, x):
        # Conv-layers
        out = self.bn1(torch.relu(self.c1(x)))
        print("1 ", out.shape)
        # out = self.pool(out, kernel_size=8, stride=8, padding=0)
        out = self.pool(out, kernel_size=4, stride=2, padding=1)
        print("1 ", out.shape)
        out = self.bn2(torch.relu(self.c2(out)))
        print("2 ", out.shape)
        out = self.pool(out, kernel_size=4, stride=2, padding=1)
        print("2 ", out.shape)
        out = self.bn3(torch.relu(self.c3(out)))
        print("3 ", out.shape)
        out = self.pool(out, kernel_size=4, stride=4, padding=1)
        print("3 ", out.shape)

        # Fully connected lyrs
        out = out.view(out.size(0), -1)
        out = self.dropout(self.f1(out))
        out = self.dropout(self.f2(out))
        out = self.f3(out)

        return out

In [81]:
model = OneDCNN(n_channels=64, hidden=128, n_classes=17)
sum([p.numel() for p in model.parameters()])

153873

In [82]:
x = torch.ones((128, 6, 16))
y = model(x)
y.shape

1  torch.Size([128, 64, 16])
1  torch.Size([128, 64, 8])
2  torch.Size([128, 128, 8])
2  torch.Size([128, 128, 4])
3  torch.Size([128, 256, 4])
3  torch.Size([128, 256, 1])


torch.Size([128, 17])

# ResNet50

In [21]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1, heads=4, mhsa=False, resolution=None):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        if not mhsa:
            self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1, stride=stride, bias=False)
        else:
            self.conv2 = nn.ModuleList()
            self.conv2.append(MHSA(planes, width=int(resolution[0]), height=int(resolution[1]), heads=heads))
            if stride == 2:
                self.conv2.append(nn.AvgPool2d(2, 2))
            self.conv2 = nn.Sequential(*self.conv2)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion * planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=1000, resolution=(224, 224), heads=4, is_botnet=False):
        super(ResNet, self).__init__()
        self.in_planes = 64
        self.resolution = list(resolution)

        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        if self.conv1.stride[0] == 2:
            self.resolution[0] /= 2
        if self.conv1.stride[1] == 2:
            self.resolution[1] /= 2
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) # for ImageNet
        if self.maxpool.stride == 2:
            self.resolution[0] /= 2
            self.resolution[1] /= 2

        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        if is_botnet == False:
            self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2, heads=heads, mhsa=False)
        else:
            self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=1, heads=heads, mhsa=True)
            
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Sequential(
            nn.Dropout(0.3), # All architecture deeper than ResNet-200 dropout_rate: 0.2
            nn.Linear(512 * block.expansion, num_classes)
        )

    def _make_layer(self, block, planes, num_blocks, stride=1, heads=4, mhsa=False):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for idx, stride in enumerate(strides):
            layers.append(block(self.in_planes, planes, stride, heads, mhsa, self.resolution))
            if stride == 2:
                self.resolution[0] /= 2
                self.resolution[1] /= 2
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.maxpool(out) # for ImageNet

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)

        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)
        return out


def ResNet50(num_classes=17, resolution=(16, 16), heads=4):
    return ResNet(Bottleneck, [3, 4, 6, 3], num_classes=num_classes, resolution=resolution, heads=heads, is_botnet=False)


resnet = ResNet50(num_classes=17, resolution=(16,16), heads=4)
sum([p.numel() for p in resnet.parameters()])

23540433

In [23]:
x = torch.ones((128, 1, 16, 16))
y = resnet(x)
y.shape

torch.Size([128, 17])