In [1]:
import os
import functools
import itertools
from typing import Union, Tuple, Iterable

from pathlib import Path
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torchvision.transforms import transforms
import torchvision.datasets
from torch.utils.data import DataLoader

import torchmetrics

In [2]:
# setting
data_root_path = Path('/eagle/projects/candle_aesp/siebenschuh/PT/')

# input
# - pre-processing
# - rotation
absDeg = 5
# - horizontal flip
p_hFlip = 0.2
# model
# - optimizer
b_size     = 512
opt        = 'Adam'
l_rate     = 0.001
moment     = 0.2
n_epochs   = 12
# # of workers
n_workers  = 5

In [3]:
def getOutputDim(inputDim:Tuple[int], nClasses:int, cnn_channels:Iterable[int], cnn_kernels, pool_kernels, cnn_pad):
    '''Computes image shape post convolution/pooling/batchnorm layers
    Input image (C, H, W) --> CNN output tensor (C, H, W)
    '''
    
    assert len(inputDim)==3, "inputDim must have exactly 3 int entries (C,H,W)"
    c, h, w = inputDim
    
    def dim_out(f, k, k_p, p):
        return ((f-k+1 + 2*p) // k_p) 
    
    w_loc, h_loc = w, h
    for k, k_p, p in zip(cnn_kernels, pool_kernels, cnn_pad):
        h_loc, w_loc = dim_out(h_loc, k, k_p, p), dim_out(w_loc, k, k_p, p)
    
    return cnn_channels[-1], w_loc, h_loc

In [4]:
# load & pre-process data
# - - - - - - - - - - - -
# - transformation
TraFo = transforms.Compose(transforms=[
    transforms.PILToTensor(),
    transforms.ConvertImageDtype(torch.float),
    transforms.RandomRotation((-1.*absDeg, absDeg)),
    transforms.RandomHorizontalFlip(p_hFlip)
])

# - load data
dset_train = torchvision.datasets.CIFAR10(root=data_root_path, train=True, download=True, transform=TraFo)
dset_test  = torchvision.datasets.CIFAR10(root=data_root_path, train=False, download=True, transform=TraFo)

# - input dimension
inputDim = tuple(dset_train[0][0].shape)

# - preprocess
loader_train = DataLoader(dataset=dset_train, batch_size=b_size, shuffle=True, num_workers=n_workers, pin_memory=True)
loader_test  = DataLoader(dataset=dset_test, batch_size=b_size, shuffle=True,  num_workers=n_workers, pin_memory=True)

Files already downloaded and verified
Files already downloaded and verified


In [34]:
# model helpers
def conv_block(in_c, out_c, p, k_cnn, k_pool, *args, **kwargs):
    return nn.Sequential(nn.Conv2d(in_c, out_c, kernel_size = k_cnn, padding=p, *args, **kwargs),
                         nn.BatchNorm2d(out_c),
                         nn.MaxPool2d(k_pool),
                         nn.ReLU())

def downsample(in_c:int, bias:bool=False):
    '''
    Downsampling 
    '''
    return nn.Sequential(nn.Conv2d(in_c, 2*in_c, kernel_size=1, stride=2, padding=0, bias=bias),
                         nn.BatchNorm2d(2*in_c))
    
def basic_block(in_c:int, upscale:bool, stride:int, bias:bool=False, *args, **kwargs):
    '''
    Residual NN block with standard 3x3 convolutions (projection via convolution layer to maintain dimension of input)
    '''
    if(upscale):
        out_c = 2*in_c
    else:
        out_c = in_c
    return nn.Sequential(nn.Conv2d(in_c, out_c, kernel_size=3, stride=stride, padding=1, bias=bias),
                         nn.BatchNorm2d(out_c),
                         nn.ReLU(),
                         nn.Conv2d(out_c, out_c, kernel_size=3, stride=1, padding=1, bias=bias),
                         nn.BatchNorm2d(out_c))

def resnet_layer(in_c:int, downsampleFlag:bool=True, bias:bool=False, *args, **kwargs):
    '''
    Singular ResNet-layer (downsampling variant)
    '''
    
    if(downsampleFlag):
        return nn.Sequential(*[basic_block(in_c=in_c, upscale=True, stride=2, bias=bias, *args, **kwargs), 
                               downsample(in_c=in_c, bias=bias),
                               basic_block(in_c=2*in_c, upscale=False, stride=1, bias=bias, *args, **kwargs)])
    else:
        return nn.Sequential(*[basic_block(in_c=in_c, upscale=False, stride=1, bias=bias, *args, **kwargs), 
                               basic_block(in_c=in_c, upscale=False, stride=1, bias=bias, *args, **kwargs)])

def res_resizing_block(in_c, *args, **kwargs):
    '''
    Residual NN block with standard 3x3 convolutions (projection via convolution layer to maintain dimension of input)
    '''
    return nn.Sequential(nn.Conv2d(in_c, 2*in_c, kernel_size=3, stride = 2, padding = 1),
                         nn.BatchNorm2d(2*in_c),
                         nn.ReLU())

def fc_block(in_f, out_f, pDrop=0.05, lastLayerFlag:bool=False, *args, **kwargs):
    # last layer
    if(lastLayerFlag):
        return nn.Sequential(nn.Linear(in_f, out_f, *args, **kwargs))
    
    return nn.Sequential(nn.Linear(in_f, out_f, *args, **kwargs),
                         nn.Dropout(pDrop),
                         nn.ReLU())

# model
class NN(nn.Module):
    def __init__(self, 
                 cnn_channels:Iterable[int], 
                 fc_channels:Iterable[int], 
                 cnn_kernels:int|Iterable[int],
                 inputDim:tuple, 
                 nClasses:int,
                 pool_kernels:int|Iterable[int]=2,
                 cnn_pad:int|Iterable[int]=0,
                 pDrop:float=0.05):
        
        super().__init__()
        
        self.cnn_channels  = cnn_channels
        self.fc_channels   = fc_channels
        self.cnn_kernels   = cnn_kernels
        self.inputDim      = inputDim
        self.nClasses      = nClasses
        self.pool_kernels  = pool_kernels
        self.cnn_pad       = cnn_pad
        self.pDrop         = pDrop
        
        # convert input
        cnn_depth = len(self.cnn_channels)
        if(isinstance(self.cnn_kernels, int)):
            self.cnn_kernels = [self.cnn_kernels]*cnn_depth
        if(isinstance(self.pool_kernels, int)):
            self.pool_kernels = [self.pool_kernels]*cnn_depth
        if(isinstance(self.cnn_pad, int)):
            self.cnn_pad = [self.cnn_pad]*cnn_depth
        
        # CNN
        cnn_channels_aug = [inputDim[0]] + cnn_channels
        conv_blocks = [conv_block(in_c, out_c, p, k_cnn, k_pool,) for (in_c, out_c, p, k_cnn, k_pool) in zip(cnn_channels_aug, cnn_channels_aug[1:], self.cnn_pad, self.cnn_kernels, self.pool_kernels)]
        self.conv = nn.Sequential(*conv_blocks)

        # intermediate dimension
        x_dim = getOutputDim(self.inputDim, nClasses=self.nClasses, cnn_channels=self.cnn_channels, 
                             cnn_kernels=self.cnn_kernels, pool_kernels=self.pool_kernels, 
                             cnn_pad=self.cnn_pad)
        feat_in  = np.prod(x_dim)

        # FC
        if(len(fc_channels) > 0):
            fc_channels_aug = [feat_in]+fc_channels+[self.nClasses]
        else:
            fc_channels_aug = [feat_in]+[self.nClasses]
        fc_blocks = [fc_block(in_f, out_f, lastLayerFlag=j==len(fc_channels_aug)-2) for j, (in_f, out_f) in enumerate(zip(fc_channels_aug, fc_channels_aug[1:]))]
        self.fc = nn.Sequential(*fc_blocks)
        
    def forward(self, x):
        # CNN
        x = self.conv(x)
        # flatten
        x = x.view(x.size(0), -1) # flat
        
        # FC
        x = self.fc(x)
        
        return x
    
    
class ResNet(nn.Module):
    def __init__(self, 
                 depth:int,
                 nClasses:int,
                 inputDim:Tuple[int]):
        
        super().__init__()
        
        self.depth    = depth
        self.nClasses = nClasses
        self.inputDim = inputDim

        # residual CNN blocks
        conv_blocks = list(itertools.chain(*[[res_proj_block(64*2**(d))]+[res_resizing_block(64*2**(d))] for d in range(depth)]))
        c0 = nn.Sequential(*[nn.Conv2d(3, 64, kernel_size=3, stride=1)])
        c1 = nn.Sequential(*conv_blocks)
        # avg pooling
        p1 = nn.Sequential(nn.AdaptiveAvgPool2d(output_size=(1,1)))
        self.conv = nn.Sequential(*[c0, c1, p1])

        # intermediate dimension
        x_dim = self.conv(torch.zeros((1, *inputDim))).shape
        self.feat_in = np.prod(x_dim)

        # FC
        self.fc = nn.Sequential(nn.Linear(self.feat_in, self.nClasses))
        
    def forward(self, x):
        # CNN
        x = self.conv(x)
        # flatten
        x = x.view(x.size(0), -1) # flat
        
        # FC
        x = self.fc(x)
        
        return x

In [35]:
#nn.Sequential(*[basic_block(in_c=in_c, upscale=True, stride=2, bias=bias, *args, **kwargs), 
#                           downsample(in_c=in_c, bias=bias),
#                           basic_block(in_c=2*in_c, upscale=False, stride=1, bias=bias, *args, **kwargs)])

x = torch.rand((16, 32, 32, 32))

l1 = basic_block(in_c=32, upscale=False, stride=2)
l2 = basic_block(in_c=32, upscale=False, stride=2)
#ds = downsample(in_c=6, bias=False)
l2(l1(x)).shape

torch.Size([16, 32, 8, 8])

In [36]:
x = torch.rand((16, 3,32,32))
layer = resnet_layer(in_c=3, downsampleFlag=False)
layer(x).shape

torch.Size([16, 3, 32, 32])

In [39]:
layer = resnet_layer(in_c=64, downsampleFlag=True)
layer

Sequential(
  (0): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (1): Sequential(
    (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (2): Sequential(
    (0): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
)

In [None]:
# Device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Model
net = ResNet(depth = 4, nClasses=10, inputDim=(3,32,32))

# -> GPU
net = net.to(device)

In [None]:
net.feat_in

In [None]:
# Device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Model Inputs
inputDim = (3, 32, 32)

# Inputs
cnn_channels:Iterable[int]    = [8,   32, 64, 128, 256]
fc_channels:Iterable[int]     = [512, 256, 128]
cnn_kernels:int|Iterable[int] = [3, 3, 2, 2, 2, 2]
cnn_pad:int|Iterable[int]     = 2
nClasses:int                  = 10

# Model
net = NN(cnn_channels = cnn_channels, 
         fc_channels = fc_channels, 
         cnn_kernels  = cnn_kernels, 
         cnn_pad      = cnn_pad,
         pDrop        = 0.075, 
         inputDim     = inputDim, 
         nClasses     = 10)

# -> GPU
net = net.to(device)

In [None]:
# Optimizer
optimizer = optim.Adam(params=net.parameters(), lr=0.00001, betas=(0.9, 0.999), weight_decay=0.01)
criterion = nn.CrossEntropyLoss()

# Metric
accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=10)
accuracy = accuracy.to(device)

In [None]:
# accuracy
acc = torchmetrics.Accuracy(task='multiclass', num_classes=10)

# 
acc_local    = 0.0
running_loss = 0.0
loss_list    = []
train_acc, test_acc = [], []

# 1st loop: epochs
for i in range(100):
    print(f'\nEpoch: {i}')
    
    # 2nd loop: train batches
    for j, batch in enumerate(loader_train):
        # unpack
        inputs, labels = batch
        
        # -> device 13s
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # zero grad
        optimizer.zero_grad()
        
        # backprop
        outputs = net(inputs)                  # forward
        loss    = criterion(outputs, labels)   # loss
        loss.backward()                        # backward 
        optimizer.step()
        
        # loss
        running_loss += loss.item()
        loss_list.append(loss.item())
        
        # print
        if(j%100==99):
            print(f'Loss : {loss:.2f}')
            
    # track
    train_acc.append(acc(outputs.argmax(dim=1).cpu(), labels.cpu()))
    
    
    # validation
    with torch.set_grad_enabled(False):
        test_acc_loc = []
        for batch in loader_test:
            inputs, labels = batch
            
            # -> device 13s
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # predict
            preds = net(inputs).argmax(dim=1)
    
            # accuracy
            test_acc_loc.append(acc(preds.cpu(), labels.cpu()))
        # append
        test_acc.append(np.mean(test_acc_loc))

print('\nFinished\n')

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
test_acc[-1]

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
test_acc[-1]

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
# CIFAR10
# net = NN(cnn_depth=3, cnn_k=3, fc_depth=3, cnn_mult=4, pDrop=0.075, inputDim=inputDim, nClasses=10)

plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()

In [None]:
plt.plot(train_acc)
plt.plot(test_acc)
plt.show()