# ResNet18 training

created on 20/06/2023

In [1]:
# import dependencies
import time
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# from sklearn.model_selection import train_test_split

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, ConcatDataset

from torchvision import datasets
from torchvision.transforms import ToTensor
import cv2
from PIL import Image
from torchvision import transforms as T
import glob
import os
import argparse
from tqdm import tqdm
import wandb

## Dataset Preparation

In [139]:
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import cv2
from PIL import Image
from torchvision import transforms as T
import glob
import os
import numpy as np


class LPCVCDataset(Dataset):
    def __init__(self, datapath, transform, n_class=14, train=True, patch=False):
        self.datapath = datapath

        self.transform = transform
        self.n_class = n_class
        self.train = train

        self.patches = patch

    def __len__(self):
        if self.train:
            files = glob.glob(os.path.join(self.datapath + 'train/IMG/train', "*.png"))
        else:
            files = glob.glob(os.path.join(self.datapath + 'val/LPCVC_Val/IMG/val', "*.png"))
        return len(files)

    def __getitem__(self, idx):
        if self.train:
            img = cv2.imread(self.datapath + 'train/IMG/train/train_' + str(idx).zfill(4) + '.png')
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            mask = cv2.imread(self.datapath + 'train/GT_Updated/train/train_' + str(idx).zfill(4) + '.png')
        else:
            img = cv2.imread(self.datapath + 'val/LPCVC_Val/GT/val/val_' + str(idx).zfill(4) + '.png')
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            mask = cv2.imread(self.datapath + 'val/LPCVC_Val/IMG/val/val_' + str(idx).zfill(4) + '.png')

        if self.transform is not None:
            augmentations = self.transform(image=image, mask=mask)
            image = augmentations["image"]
            mask = augmentations["mask"]

        t = T.Compose([T.ToTensor(), T.Normalize(0, 1)])
        img = t(img)
        mask = self.onehot(mask, self.n_class)

        return img, mask

    def onehot(self, img, nb):
        oh = np.zeros((img.shape[0], img.shape[1], nb))
        for i in range(nb):
            oh[:, :, i] = (img[:, :, 0] == i)
        return oh

In [140]:
train_dataset = LPCVCDataset("dataset/",transform=None,  n_class=14, train=True)
val_dataset = LPCVCDataset("dataset/", transform=None,  n_class=14, train=False)

train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=32,
        shuffle=True,
        num_workers=2,
        pin_memory=True
)

val_loader = torch.utils.data.DataLoader(
        dataset=val_dataset,
        batch_size=32,
        shuffle=False,
        num_workers=1,
        pin_memory=True
)

In [150]:
print(train_dataset[0][0].size()) # shape of input
print(train_dataset[0][1].shape) # shape of output

torch.Size([3, 512, 512])
(512, 512, 14)


## Resnet Definition

In [35]:
#define the device to use
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [33]:
"""
created on 20/06/2023, building a resnet18 by scratch
"""
class Block(nn.Module):

    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
        x += identity
        x = self.relu(x)
        return x


class ResNet_18(nn.Module):

    def __init__(self, image_channels, num_classes):

        super(ResNet_18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        #resnet layers
        self.layer1 = self.__make_layer(64, 64, stride=1)
        self.layer2 = self.__make_layer(64, 128, stride=2)
        self.layer3 = self.__make_layer(128, 256, stride=2)
        self.layer4 = self.__make_layer(256, 512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def __make_layer(self, in_channels, out_channels, stride):

        identity_downsample = None
        if stride != 1:
            identity_downsample = self.identity_downsample(in_channels, out_channels)

        return nn.Sequential(
            Block(in_channels, out_channels, identity_downsample=identity_downsample, stride=stride),
            Block(out_channels, out_channels)
        )

    def forward(self, x):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x

    def identity_downsample(self, in_channels, out_channels):

        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(out_channels)
        )

In [34]:
resnet_model = ResNet_18(3, 14)

### Resnet on main branch

In [178]:
import torch
import torch.nn as nn

from torchvision import models


class BottleNeck(nn.Module):
    # Scale factor of the number of output channels
    expansion = 4

    def __init__(self, in_channels, out_channels, stride=1, is_first_block=False):
        """
        Args:
            in_channels: number of input channels
            out_channels: number of output channels
            stride: stride using in (a) 3x3 convolution and
                    (b) 1x1 convolution used for downsampling for skip connection
            is_first_block: whether it is the first residual block of the layer
        """
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.relu = nn.ReLU()

        # Skip connection goes through 1x1 convolution with stride=2 for
        # the first blocks of conv3_x, conv4_x, and conv5_x layers for matching
        # spatial dimension of feature maps and number of channels in order to
        # perform the add operations.
        self.downsample = None
        if is_first_block:
            self.downsample = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=out_channels*self.expansion, kernel_size=1, stride=stride, padding=0), nn.BatchNorm2d(out_channels*self.expansion))


    def forward(self, x):
        """
        Args:
            x: input
        Returns:
            Residual block output
        """
        identity = x.clone()
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))

        x = self.conv3(x)
        x = self.bn3(x)

        if self.downsample:
            identity = self.downsample(identity)

        x += identity
        x = self.relu(x)

        return x


class BasicBlock(nn.Module):
    # Scale factor of the number of output channels
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, is_first_block=False):
        """
        Args:
            in_channels: number of input channels
            out_channels: number of output channels
            stride: stride using in (a) the first 3x3 convolution and
                    (b) 1x1 convolution used for downsampling for skip connection
            is_first_block: whether it is the first residual block of the layer
        """
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=stride, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.relu = nn.ReLU()

        # Skip connection goes through 1x1 convolution with stride=2 for
        # the first blocks of conv3_x, conv4_x, and conv5_x layers for matching
        # spatial dimension of feature maps and number of channels in order to
        # perform the add operations.
        self.downsample = None
        if is_first_block and stride != 1:
            self.downsample = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride, padding=0), nn.BatchNorm2d(out_channels))


    def forward(self, x):
        """
        Args:
            x: input
        Returns:
            Residual block ouput
        """
        identity = x.clone()
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.bn2(self.conv2(x))

        if self.downsample:
            identity = self.downsample(identity)
        x += identity
        x = self.relu(x)

        return x


class ResNet(nn.Module):
    def __init__(self, ResBlock, n_classes, n_blocks_list=[3, 4, 6, 3], out_channels_list=[64, 128, 256, 512], num_channels=3):
        """
        Args:
            ResBlock: residual block type, BasicBlock for ResNet-18, 34 or
                      BottleNeck for ResNet-50, 101, 152
            n_class: number of classes for image classifcation (used in classfication head)
            n_block_lists: number of residual blocks for each conv layer (conv2_x - conv5_x)
            out_channels_list: list of the output channel numbers for conv2_x - conv5_x
            num_channels: the number of channels of input image
        """
        super().__init__()

        # First layer
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels=num_channels, out_channels=64, kernel_size=7, stride=2, padding=3), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

        # Create four convoluiontal layers
        in_channels = 64
        # For the first block of the second layer, do not downsample and use stride=1.
        self.conv2_x = self.CreateLayer(ResBlock, n_blocks_list[0], in_channels, out_channels_list[0], stride=1)

        # For the first blocks of conv3_x - conv5_x layers, perform downsampling using stride=2.
        # By default, ResBlock.expansion = 4 for ResNet-50, 101, 152,
        # ResBlock.expansion = 1 for ResNet-18, 34.
        self.conv3_x = self.CreateLayer(ResBlock, n_blocks_list[1], out_channels_list[0]*ResBlock.expansion, out_channels_list[1], stride=2)
        self.conv4_x = self.CreateLayer(ResBlock, n_blocks_list[2], out_channels_list[1]*ResBlock.expansion, out_channels_list[2], stride=2)
        self.conv5_x = self.CreateLayer(ResBlock, n_blocks_list[3], out_channels_list[2]*ResBlock.expansion, out_channels_list[3], stride=2)

        # Average pooling (used in classification head)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        # MLP for classification (used in classification head)
        self.fc = nn.Linear(out_channels_list[3] * ResBlock.expansion, n_classes)


    def forward(self, x):
        """
        Args:
            x: input image
        Returns:
            C2: feature maps after conv2_x
            C3: feature maps after conv3_x
            C4: feature maps after conv4_x
            C5: feature maps after conv5_x
            y: output class
        """
        x = self.conv1(x)

        # Feature maps
        C2 = self.conv2_x(x)
        C3 = self.conv3_x(C2)
        C4 = self.conv4_x(C3)
        C5 = self.conv5_x(C4)

        # Classification head
        y = self.avgpool(C5)
        y = y.reshape(y.shape[0], -1)
        y = self.fc(y)

        return C2, C3, C4, C5, y


    def CreateLayer(self, ResBlock, n_blocks, in_channels, out_channels, stride=1):
        """
        Create a layer with specified type and number of residual blocks.
        Args:
            ResBlock: residual block type, BasicBlock for ResNet-18, 34 or
                      BottleNeck for ResNet-50, 101, 152
            n_blocks: number of residual blocks
            in_channels: number of input channels
            out_channels: number of output channels
            stride: stride used in the first 3x3 convolution of the first resdiual block
            of the layer and 1x1 convolution for skip connection in that block
        Returns:
            Convolutional layer
        """
        layer = []
        for i in range(n_blocks):
            if i == 0:
                # Downsample the feature map using input stride for the first block of the layer.
                layer.append(ResBlock(in_channels, out_channels, stride=stride, is_first_block=True))
            else:
                # Keep the feature map size same for the rest three blocks of the layer.
                # by setting stride=1 and is_first_block=False.
                # By default, ResBlock.expansion = 4 for ResNet-50, 101, 152,
                # ResBlock.expansion = 1 for ResNet-18, 34.
                layer.append(ResBlock(out_channels*ResBlock.expansion, out_channels))

        return nn.Sequential(*layer)


def GetFeatureMapsFromResnet(net, x):
    """
    Args:
        net: network input from torchvision.
        x: input image
    Returns:
        C2: feature maps after conv2_x
        C3: feature maps after conv3_x
        C4: feature maps after conv4_x
        C5: feature maps after conv5_x
    """
    x = net.conv1(x)
    x = net.bn1(x)
    x = net.relu(x)
    x = net.maxpool(x)
    C2 = net.layer1(x)
    C3 = net.layer2(C2)
    C4 = net.layer3(C3)
    C5 = net.layer4(C4)
    return C2, C3, C4, C5


# if __name__ == "__main__":
### Customed version ###
# Resnet18
net = ResNet(BasicBlock, 1000, n_blocks_list=[2, 2, 2, 2])
# Resnet34
#net = ResNet(BasicBlock, 1000)
# Resnet50
# net = ResNet(BottleNeck, 1000)
# Resnet101
#net = ResNet(BottleNeck, 1000, n_blocks_list=[3, 4, 23, 3])
# Resnet152
#net = ResNet(BottleNeck, 1000, n_blocks_list=[3, 8, 36, 3])
x = torch.randn((1, 3, 512, 800), dtype=torch.float32)
C2, C3, C4, C5, _ = net(x)

### torchvision version ###
net_tv = models.resnet18(pretrained=False)
#net_tv = models.resnet34(pretrained=False)
# net_tv = models.resnet50(pretrained=False)
#net_tv = models.resnet101(pretrained=False)
#net_tv = models.resnet152(pretrained=False)
C2_tv, C3_tv, C4_tv, C5_tv = GetFeatureMapsFromResnet(net_tv, x)

# print("Verifying the feature map shapes of customed ResNet and ResNet from torchvision")
# print(f"C2.shape of customed ResNet: {C2.shape}")
# print(f"C2.shape of torchvision ResNet: {C2_tv.shape}")
# print(f"C3.shape of customed ResNet: {C3.shape}")
# print(f"C3.shape of torchvision ResNet: {C3_tv.shape}")
# print(f"C4.shape of customed ResNet: {C4.shape}")
# print(f"C4.shape of torchvision ResNet: {C4_tv.shape}")
# print(f"C5.shape of customed ResNet: {C5.shape}")
# print(f"C5.shape of torchvision ResNet: {C5_tv.shape}")
#
# print("Done!")


In [None]:
resnet_model = ResNet(BasicBlock, 14, n_blocks_list=[2, 2, 2, 2])

## Training

In [163]:
#move the model to the device
resnet_model.to(device)
next(resnet_model.parameters()).is_cuda

False

In [179]:
import argparse

In [180]:
# Training settings
parser = argparse.ArgumentParser(description='Information Removal at the bottleneck in Deep Neural Networks')
parser.add_argument('--batch-size', type=int, default=32, metavar='N',
                    help='input batch size for training (default: 32)')
parser.add_argument('--epochs', type=int, default=100, metavar='N',
                    help='number of epochs to train (default: 100)')
parser.add_argument('--lr', type=float, default=0.1, metavar='LR',
                    help='learning rate (default: 0.1)')
parser.add_argument('--weight_decay', type=float, default=0.0001)
# parser.add_argument('--dev', default="cuda:0")
parser.add_argument('--dev', default="cpu")
parser.add_argument('--momentum-sgd', type=float, default=0.9, metavar='M',
                    help='Momentum')
parser.add_argument('--datapath', default='dataset/')
args = parser.parse_args("")
args

Namespace(batch_size=32, datapath='dataset/', dev='cpu', epochs=100, lr=0.1, momentum_sgd=0.9, weight_decay=0.0001)

In [177]:
from tqdm import tqdm
iteration = 0
args.device = torch.device(args.dev)
if args.dev != "cpu":
    torch.cuda.set_device(args.device)
model = resnet_model.to(args.device)
args.criterion = torch.nn.BCEWithLogitsLoss().to(args.device)
args.optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum_sgd, weight_decay=args.weight_decay)




for data in tqdm(train_loader):
    iteration+=1

    inputs, labels = data[0].to(args.device), data[1].to(args.device)

    with torch.cuda.amp.autocast():
        outputs=model(inputs)
        print("type outputs: ", type(outputs))
        print("tuple len: ", len(outputs))
        print(outputs[0].size())
        print("type outputs[1]: ", type(outputs[1]))
        print(outputs[1].size())
        print("type labels: ", type(labels))
        print(labels.size())

        # print("output size: ", outputs.size())
        # print("labels size: ", labels.size())
        loss = args.criterion(outputs[1],labels)
    break


  0%|          | 0/32 [00:09<?, ?it/s]

type outputs:  <class 'tuple'>
tuple len:  5
torch.Size([32, 64, 128, 128])
type outputs[1]:  <class 'torch.Tensor'>
torch.Size([32, 128, 64, 64])
type labels:  <class 'torch.Tensor'>
torch.Size([32, 512, 512, 14])





ValueError: Target size (torch.Size([32, 512, 512, 14])) must be the same as input size (torch.Size([32, 128, 64, 64]))

### passed train and test function


In [143]:
def train(model, args, train_loader):
    model.train()
    running_loss=0
    iteration=0
    correct = 0
    total=0

    for data in tqdm(train_loader):
        iteration+=1

        inputs, labels = data[0].to(args.device), data[1].to(args.device)

        with torch.cuda.amp.autocast():
            outputs=model(inputs)
            loss = args.criterion(outputs,labels)

        args.optimizer.zero_grad()
        loss.backward()
        args.optimizer.step()

        running_loss += loss.item()

        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()


    train_loss=running_loss/len(train_loader)
    # train_loss=running_loss/31  # 1021/32
    accu=100.*correct/total


    train_accu.append(accu)
    train_losses.append(train_loss)

    print('Train Loss: %.3f | Accuracy: %.3f'%(train_loss,accu))
    return(accu, train_loss)

def test(model, args, val_loader):
    model.eval()
    running_loss=0
    iteration=0
    correct = 0
    total=0

    with torch.no_grad():
        for data in tqdm(val_loader):
            iteration+=1

            inputs, labels = data[0].to(args.device), data[1].to(args.device)

            with torch.cuda.amp.autocast():
                outputs=model(inputs)
                loss = args.criterion(outputs,labels)

            args.optimizer.zero_grad()
            loss.backward()
            args.optimizer.step()

            running_loss += loss.item()

            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()


    test_loss=running_loss/len(val_loader)
    # test_loss=running_loss/3  # 100/32
    accu=100.*correct/total


    eval_accu.append(accu)
    eval_losses.append(test_loss)

    print('Test Loss: %.3f | Accuracy: %.3f'%(test_loss,accu))
    return(accu, test_loss)


# Training settings
parser = argparse.ArgumentParser(description='Information Removal at the bottleneck in Deep Neural Networks')
parser.add_argument('--batch-size', type=int, default=32, metavar='N',
                    help='input batch size for training (default: 32)')
parser.add_argument('--epochs', type=int, default=100, metavar='N',
                    help='number of epochs to train (default: 100)')
parser.add_argument('--lr', type=float, default=0.1, metavar='LR',
                    help='learning rate (default: 0.1)')
parser.add_argument('--weight_decay', type=float, default=0.0001)
# parser.add_argument('--dev', default="cuda:0")
parser.add_argument('--dev', default="cpu")
parser.add_argument('--momentum-sgd', type=float, default=0.9, metavar='M',
                    help='Momentum')
parser.add_argument('--datapath', default='dataset/')
args = parser.parse_args("")

args.device = torch.device(args.dev)
if args.dev != "cpu":
    torch.cuda.set_device(args.device)

model = resnet_model.to(args.device)

train_dataset = LPCVCDataset(datapath=args.datapath,transform=None,  n_class=14, train=True)
# train_dataset = drone_dataset_train
train_loader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=args.batch_size,
        shuffle=True,
        # num_workers=2,
        num_workers=0,
        pin_memory=True
)

# train_loader = train_loader

val_dataset = LPCVCDataset(datapath=args.datapath, transform=None,  n_class=14, train=False)
# val_dataset = drone_dataset_test
val_loader = torch.utils.data.DataLoader(
        dataset=val_dataset,
        batch_size=args.batch_size,
        shuffle=False,
        # num_workers=1,
        num_workers=0,
        pin_memory=True
)

# val_loader = val_loader

args.criterion = torch.nn.BCEWithLogitsLoss().to(args.device)
args.optimizer = torch.optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum_sgd, weight_decay=args.weight_decay)

train_accu = []
train_losses = []

eval_accu = []
eval_losses = []

# wandb.init(project="LPCVC", entity='lpcvc')
wandb.init(project="LPCVC")
wandb.run.name = "resnet18_train"
wandb.config.epochs = args.epochs
wandb.config.batch_size = args.batch_size
wandb.config.learning_rate = args.lr
wandb.config.weight_decay = args.weight_decay
wandb.config.momentum = args.momentum_sgd
wandb.config.train_dataset = train_dataset
wandb.config.test_dataset = val_dataset
# wandb.config.train_targets = train_dataset.targets


for epoch in range(1, args.epochs+1):
    print('\nEpoch : %d'%epoch)
    train_acc, train_loss = train(model, args, train_loader)
    test_acc, test_loss = test(model, args, val_loader)
    wandb.log(
        {"train_acc": train_acc, "train_loss": train_loss,
        "test_acc": test_acc, "test_loss": test_loss})

wandb.finish()

VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.016933333330477276, max=1.0…


Epoch : 1


  0%|          | 0/32 [00:06<?, ?it/s]


ValueError: Target size (torch.Size([32, 512, 512, 14])) must be the same as input size (torch.Size([32, 14]))