#**I**. Installing package *Groupy* + mount Google Drive

In [0]:
#connecting with drive and computer
import sys
from google.colab import drive
import importlib.util

# Mounting Google Drive 
drive.mount('/content/gdrive')

!pip install nose
!pip install chainer

%cd /content/gdrive/My Drive/Deep Learning/Reproduction project
! git clone https://github.com/adambielski/GrouPy.git

%cd /content/gdrive/My Drive/Deep Learning/Reproduction project/GrouPy
! python setup.py install

!nosetests -v

#**II**. Import packages

In [0]:
#pytorch
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary
from torch.autograd import Variable

#groupy import
from groupy.gconv.pytorch_gconv.splitgconv2d import P4ConvZ2, P4ConvP4, P4MConvZ2, P4MConvP4M

#general
import matplotlib.pyplot as plt
import numpy as np
import time
import math

#some extra
from functools import partial
from dataclasses import dataclass
from collections import OrderedDict

#**III**. Building the model(s): a Pytorch implementation



In [0]:
'''
Here the ResNet class is created. It does so specifically for p4m. It needs some
trivial adaptation to work for the conventional and other group form (p4) as well:
BatchNorm3d should be replaced if a conventional residual network is wanted.
Secondly, all convolutions should be replaced by either Conv2D or P4ConvP4/Z2
depending on the type of network you want to create. There will come a version
in which this can be done automatically, but it is not this day.
'''

# ------------------------ CLASS RESIDUAL BLOCK --------------------------------
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels, self.out_channels =  in_channels, out_channels
        self.blocks   = nn.Identity() #initialize block as identity
        self.shortcut = nn.Identity() #initialize skip as identity
    
    def forward(self, x):
        #shortcut
        residual = x
        if self.should_apply_shortcut: residual = self.shortcut(x)
        x = self.blocks(x)

        #combination
        x += residual
        x = F.relu(x)
        return x
    
    @property
    def should_apply_shortcut(self):
        return self.in_channels != self.out_channels


# ----------------------- CLASS RESNET RESIDUAL BLOCK --------------------------
class ResNetResidualBlock(ResidualBlock):
    def __init__(self, in_channels, out_channels, expansion=1, downsampling=1, 
                 conv=P4MConvP4M, *args, **kwargs):
        super().__init__(in_channels, out_channels)
        self.expansion, self.downsampling, self.conv = expansion, downsampling, conv

        self.shortcut = nn.Sequential(OrderedDict({
            'conv' : P4MConvP4M(self.in_channels, 
                                self.expanded_channels, kernel_size=1,
                                stride=self.downsampling, bias=False),
            'bn'   : nn.BatchNorm3d(self.expanded_channels)
            
        })) if self.should_apply_shortcut else None 

        
    @property
    def expanded_channels(self):
        return self.out_channels * self.expansion
    
    @property
    def should_apply_shortcut(self):
        return self.in_channels != self.expanded_channels



# --------------------- PRE CONVOLUTION SEQUENTIAL -----------------------------
def bn_conv(in_channels, out_channels, conv, activation = nn.ReLU, *args, **kwargs):
    return nn.Sequential(OrderedDict({'conv': conv(in_channels, out_channels, 
                                                   kernel_size = 3, padding = 1, 
                                                   *args, **kwargs),
                                      'bn': nn.BatchNorm3d(out_channels)}))
    

# ------------------------- CLASS BASIC BLOCK ----------------------------------
class ResNetBasicBlock(ResNetResidualBlock):
    expansion = 1
    def __init__(self, in_channels, out_channels, *args, **kwargs):
        super().__init__(in_channels, out_channels, *args, **kwargs)
        self.blocks = nn.Sequential(
            bn_conv(self.in_channels, self.out_channels, conv=self.conv, 
                     bias=False, stride=self.downsampling),
            nn.ReLU(),
            bn_conv(self.out_channels, self.expanded_channels, 
                     conv=self.conv, bias=False),
        )


# ------------------------- CLASS RESNET LAYER ---------------------------------
class ResNetLayer(nn.Module):
    def __init__(self, in_channels, out_channels, block=ResNetBasicBlock, n=1, *args, **kwargs):
        super().__init__()
        # 'We perform downsampling directly by convolutional layers that have a stride of 2.'
        downsampling = 2 if in_channels != out_channels else 1
        
        #first introduce block(... )
        self.blocks = nn.Sequential(
            block(in_channels , out_channels, *args, **kwargs, downsampling=downsampling),
            # in the next the n-1 blocks are subsequently stacked (_ underscore means
            # that the do_something will be executed the prescribed amount of times
            *[block(out_channels * block.expansion, 
                    out_channels, downsampling=1, *args, **kwargs) for _ in range(n - 1)]
        )

    def forward(self, x):
        x = self.blocks(x)
        return x


# ------------------------- CLASS RESNET ENCODER -------------------------------
class ResNetEncoder(nn.Module): 
    def __init__(self, in_channels=3, blocks_sizes=[11, 23, 45], depths=[7,7,7], 
                 activation=nn.ReLU, block=ResNetBasicBlock, *args,**kwargs):
        super().__init__()

        self.blocks_sizes = blocks_sizes
        
        self.initconv = nn.Sequential(OrderedDict({
            'conv'    : P4MConvZ2(in_channels, self.blocks_sizes[0], 
                        kernel_size=7, stride = 1, padding=3, bias=False),
            'bn'      : nn.BatchNorm3d(self.blocks_sizes[0]),
            'act'     : activation()})
        )
        
        self.in_out_block_sizes = list(zip(blocks_sizes, blocks_sizes[1:]))
        self.blocks = nn.ModuleList([ 
            ResNetLayer(blocks_sizes[0], blocks_sizes[0], n=depths[0], activation=activation, 
                        block=block,  *args, **kwargs),
            *[ResNetLayer(in_channels * block.expansion, 
                          out_channels, n=n, activation=activation, 
                          block=block, *args, **kwargs) 
              for (in_channels, out_channels), n in zip(self.in_out_block_sizes, depths[1:])]       
        ])
        
        
    def forward(self, x):
        x = self.initconv(x)
        for block in self.blocks:
            x = block(x)
        return x


# ------------------------- CLASS RESNET DECODER -------------------------------
class ResNetDecoder(nn.Module):
    """
    This class represents the tail of the ResNet, also know as the decoder part. 
    Average pooling is followed by a fully connected 1-hidden layer deep neural
    network.
    """
    def __init__(self, in_features, n_classes, *args, **kwargs):
        super().__init__()
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.decoder = nn.Linear(in_features, n_classes)

    def forward(self, x):
        x = self.avgpool(x)
        x = x.view(x.size(0), -1) #flatten
        x = self.decoder(x)
        return x


# ------------------------------ CLASS RESNET ----------------------------------
class ResNet(nn.Module):
    '''
    This class represents the full implementation of the residual network. This is
    the connection of the encoder (initial convolution and residual stages) 
    followed by the decoder (fully connected network).
    '''
    def __init__(self, in_channels, n_classes, choice = 1, *args, **kwargs):
        super().__init__()

        self.encoder = ResNetEncoder(in_channels, *args, **kwargs)
        self.decoder = ResNetDecoder(self.encoder.blocks[-1].blocks[-1].expanded_channels*8, n_classes)

    def forward(self, x):
        x = self.encoder(x)
        xs = x.size()
        x = x.view(xs[0], xs[1]*xs[2], xs[3], xs[4])
        x = self.decoder(x)
        return x


# ----------------------------- DEFINE RESNET44 --------------------------------
# here, the model is created
def resnet44_P4M(in_channels, n_classes):
    return ResNet(in_channels, n_classes, block=ResNetBasicBlock, depths=[7, 7, 7])

#**IV**. Loading a model

In [0]:
'''
It is sometimes prefered to load a model or its checkpoint to either continue training
or evaluate some of its properties. This small piece of code can come in handy.
It assumes the models was earlier saved using _state_dict() which only saves the 
values of the parameters instead of the model as a whole (it simply takes less 
space and time this way). Therefor, first assign the the network to the model 
and second load the values of its parameters using load_state_dict().
'''
#Loading model
name_model = input('Name model?') #input name of the model (how it is saved)
directory = '/content/gdrive/My Drive/Deep Learning/Reproduction project/Saved Networks/Preliminary/' #put your own directory here
path_model = directory + name_model + '.pth'
model = ResNet(3, 10, block=ResNetBasicBlock, depths=[7, 7, 7]) 
result = model.load_state_dict(torch.load(path_model)) #check of alle weights zijn geladen

#print results: if missing keys and unexpected keys are empty, the model has been loaded in splendid fashion
print('')
print('=== Weights loaded ===')
print('Missing keys:   ', result.missing_keys)
print('Unexpected keys:', result.unexpected_keys)

#**V**. Loading the data

In [6]:
'''
Here the data is loaded: training set and test set. 
'''

#Normalize a tensor image with mean and standard deviation. 
transform_train = transforms.Compose([
    transforms.ToTensor(), #add other transformation if you like here
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Load training sets
batch_size  = 64
num_workers = 2
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=num_workers)

# Load test sets
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=num_workers)


Files already downloaded and verified
Files already downloaded and verified


#**VI**. Evaluation of the models

In [0]:
'''
The model is trained, evaluated and saved here. The function train_network() 
does the job for you. It automatically saves the model and its train, test and 
loss curves so far. Furthermore, during training you will get a live update 
every epoch of the scores so far.
'''

# ------------------------- TRAINING THE NETWORK -------------------------------
def train_network(net, train_loader, test_loader, device, title,
                  lr = 0.05, momentum = 0.9, gamma = 0.1, 
                  n_epochs = 300, epoch_start = 0,
                  criterion = nn.CrossEntropyLoss().cuda()):
    """
    Training and evaluation of a connected network which should be written
    in Pytorch fashion.
    """

    print('training on', device)

    optimizer = optim.SGD(net.parameters(), lr=lr, momentum=momentum)

    #Create vectors with information on loss, training accuracy and test accuracy
    loss_curve   = np.zeros((n_epochs))
    train_curve  = np.zeros((n_epochs))
    test_curve   = np.zeros((n_epochs))

    #vector with epochs
    epoch_vec = np.arange(epoch_start,n_epochs)

    # ---------------------------- START TRAINING ------------------------------
    for epoch in epoch_vec:

        net.train()
        n, start = 0, time.time()

        train_l_sum   = torch.tensor([0.0], dtype=torch.float32, device=device)
        train_acc_sum = torch.tensor([0.0], dtype=torch.float32, device=device)

        for i, data in enumerate(train_loader, 0):

            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
            with torch.no_grad():
                outputs = outputs.long()
                train_l_sum += loss.float()
                n += outputs.shape[0]

        #update learning rate
        if epoch <= 300:
            if epoch % 50 == 49:
                lr *= gamma
                optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
                print('We got ourselves a change in learning rate:')
                print('Epoch: '+ str(epoch))
                print('Learning rate changed to: ' + str(lr))

        # ------------------- START VALIDATION and SAVING ----------------------
        loss_value = train_l_sum/n

        stop1 = time.time()
        train_acc  = evaluate_accuracy(train_loader, net, device)
        stop2 = time.time()
        test_acc   = evaluate_accuracy(test_loader, net, device)

        loss_curve[epoch]  = loss_value
        train_curve[epoch] = train_acc
        test_curve[epoch]  = test_acc

        curves = np.array([loss_curve, train_curve, test_curve])

        #print updated scores
        print('epoch %d [#]  |  loss %.4f [-]  |  train acc %.3f [-]  |  test acc %.3f [-]  |  time %.1f-%.1f-%.1f [s]'\
        % (epoch + 1, loss_value, train_acc, test_acc, stop1-start, stop2-stop1, time.time()-stop2))
        
        #save every 5 epochs
        if epoch % 50 == 49:    # save every 10 epochs
            #model
            name_to_save = title + '__test=' + str(test_acc) + '%_train=' + str(train_acc) + '%_epoch=' + str(epoch+1)
            directory = '/content/gdrive/My Drive/Deep Learning/Reproduction project/Saved Networks/Preliminary/'
            path = '/content/gdrive/My Drive/Deep Learning/Reproduction project/Saved Networks/Preliminary/' + name_to_save + '.pth'
            torch.save(model.state_dict(), path)
            
            #learning curves
            path2 = directory + name_to_save + '_start:' + str(epoch_start)
            np.save(path2, curves) 
      
    print('Finished Training')



# ------------------------------ VALIDATION ------------------------------------
def evaluate_accuracy(data_iter, net, device=torch.device('cpu')):
    """Evaluate accuracy of a model on the given data set."""
    net.eval()  # Switch to evaluation mode for Dropout, BatchNorm etc layers.
    acc_sum, n = torch.tensor([0], dtype=torch.float32, device=device), 0
    for X, y in data_iter:
        # Copy the data to device.
        X, y = X.to(device), y.to(device)
        with torch.no_grad():
            y = y.long()
            acc_sum += torch.sum((torch.argmax(net(X), dim=1) == y))
            n += y.shape[0]
    return acc_sum.item()/n

# ------------------------- IN THE HOPE FOR GPU --------------------------------
def try_gpu():
    """If GPU is available, return torch.device as cuda:0; else return torch.device as cpu."""
    if torch.cuda.is_available():
        device = torch.device('cuda:0')
    else:
        device = torch.device('cpu')
    return device


# ----------------------------------- MAIN -------------------------------------

def main(): #in here we put everything we want to be runned over
    #get GPU
    device = try_gpu()

    #define model
    model = resnet44_P4M(3,10)
    model.cuda()

    #define the starting epoch, only non-zero if you start at a checkpoint
    epoch_start = 0
    
    #define batch size and titel of the model (for saving)
    bs = input('What is the batch size:')
    title = 'ResNet44Groupy10_P4M_bs:' + bs +'_SGD_every50epochs'

    #train the network
    train_network(model, trainloader, testloader, device, title, n_epochs = 300, epoch_start = epoch_start)

if __name__ == "__main__": 
    main()

#**VII**. Extra: creating learning curves

In [0]:
'''
Creates two plots for CIFAR10 and CIFAR10+ for all three networks. Please fill 
in your own directory and paths where the curves of the networks can be found. 
'''

#------------------------------- LOADING DATA ----------------------------------
directory = '/content/gdrive/My Drive/Deep Learning/Reproduction project/Saved Networks/Preliminary/'

#path names (N: conventional convolutions, P4 and P4M for group equivariant convolutions)
#10
path10_N    = directory + 'ResNet44Normal10_bs:128_SGD_every50epochs__test=0.8646%_train=1.0%_epoch=300_start:0.npy' 
path10_P4   = directory + 'ResNet44Groupy10_bs:128_SGD_every50epochs__test=0.8509%_train=0.99722%_epoch=250_start:0.npy'
path10_P4M  = directory + 'ResNet44Groupy10_P4M_bs:64_SGD_every50epochs__test=0.909%_train=0.99998%_epoch=50_start:0.npy'

#10+
path10p_N    = directory + 'ResNet44Normal_bs:64_SGD_every50epochs__test=0.9136%_train=0.9999%_epoch=300_start:0.npy'
path10p_P4   = directory + 'ResNet44Groupy_bs:64_SGD_every50epochs__test=0.9288%_train=0.9999%_epoch=300_start:0.npy'
path10p_P4M  = directory + 'ResNet44Groupy_P4M_bs:64_SGD_every50epochs__test=0.9191%_train=0.99998%_epoch=220_start:0.npy'

#loading curves
curve10_N    = np.load(path10_N)
curve10_P4   = np.load(path10_P4)
curve10_P4M  = np.load(path10_P4M)
curve10p_N   = np.load(path10p_N)
curve10p_P4  = np.load(path10p_P4)
curve10p_P4M = np.load(path10p_P4M)

#extract training, test and loss curves
#1st dimension: 10 or 10+ | second dimension: normal, P4 or P4M | third: data
train_curve = np.array([[curve10_N[1,:], curve10_P4[1,:], curve10_P4M[1,:]], 
                        [curve10p_N[1,:], curve10p_P4[1,:], curve10p_P4M[1,:]]])
test_curve  = np.array([[curve10_N[2,:], curve10_P4[2,:], curve10_P4M[2,:]], 
                        [curve10p_N[2,:], curve10p_P4[2,:], curve10p_P4M[2,:]]])

best_score = test_curve.max(axis = 2)
best_score = 100*(1 - best_score)

#---------------------------- PLOTTING THE RESULT ------------------------------
def plot_setting(fontfamily = 'Tahoma', 
                 weight = 'normal', 
                 fontsize = 16, 
                 figsize = [14, 14]):
  
    plt.rcParams['figure.figsize'] = figsize

    font = {'size'   : fontsize,
            'family' : fontfamily,
            'weight' : weight}

    plt.rc('font', **font)
    plt.grid()

def plot_learning_curves_all(train_curve, test_curve, batch_size, best_score, stop = [100, 200]):
    plot_setting(fontfamily = 'sans-serif')

    #create legends for convolution types
    legends = ['Z2', 'P4', 'P4M']

    #CIFAR10
    plt.figure(num=1)

    for i, legend in enumerate(legends):
        plt.plot(100*(1-train_curve[0, i,:stop[0]]),'--', label = 'Training | ' 
                 + legend + ' | batch size= ' + batch_size)
        plt.plot(100*(1-test_curve[0, i,:stop[0]]),'-', linewidth = 2,  label = 'Testing  | ' 
                 + legend + ' | batch size= ' + batch_size + ' | best score=' 
                 + str(best_score[0, i]) + '%')
    plt.xlabel('Epoch [#]')
    plt.ylabel('Error [%]')
    plt.legend()
    plt.title('Learning curves: CIFAR 10')


    #CIFAR10
    plt.figure(num=2)

    for i, legend in enumerate(legends):
        plt.plot(100*(1-train_curve[1, i,:stop[1]]),'--', label = 'Training | ' 
                 + legend + ' | batch size= ' + batch_size)
        plt.plot(100*(1-test_curve[1, i,:stop[1]]),'-', linewidth = 2,  
                 label = 'Testing  | ' + legend + ' | batch size= ' 
                 + batch_size + ' | best score=' + str(best_score[1, i]) + '%')
    plt.xlabel('Epoch [#]')
    plt.ylabel('Error [%]')
    plt.legend()
    plt.title('Learning curves: CIFAR 10+')

plot_learning_curves_all(train_curve, test_curve, '64', best_score.round(decimals = 2), stop = [200, 200])