# Main file for classification task

Code is adapted from class CNN notebook, and the following tutorial https://www.kaggle.com/code/songseungwon/cyclegan-tutorial-from-scratch-monet-to-photo/notebook .

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from PIL import Image
import torch
from torch import nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from collections import defaultdict

In [None]:
# Munch data preprocessing, only need to run once
########################################
# make sure the data are downloaded from 
# https://www.kaggle.com/code/isaienkov/how-to-start-with-munch-s-paintings
# and set fileDir to the folder
########################################
# Monet painting data are from 
# https://www.kaggle.com/code/dimitreoliveira/introduction-to-cyclegan-monet-paintings/notebook
# already cleaned and reshaped

# # This is the directory of the unzipped file you download
# fileDir = "../../../Desktop/NU/Deep_Learning/archive/"
# imgIndex = pd.read_csv(fileDir + "/edvard_munch.csv")
# fileList = imgIndex["filename"]

# # load each individual image, resize and save
# processDir = fileDir + "/munch_processed"
# if not os.path.exists(processDir):
#     os.makedirs(processDir)

In [None]:
# Prepare Pytorch dataset

class MyDataset(torch.utils.data.Dataset): 
    def __init__(self, MunchDir, MonetDir, MunchName, MonetName, transform=None, target_transform=None): 
        imgs = []                      
        for name in MunchName:
            imgs.append((name,1))
        for name in MonetName:
            imgs.append((name,0))
        self.imgs = imgs
        self.transform = transform
        self.target_transform = target_transform
 
    def __getitem__(self, index):  
        fn, label = self.imgs[index]
        if label == 1:
            img = Image.open(MunchDir + '/' + fn).convert('RGB')
        if label == 0:
            img = Image.open(MonetDir + '/' + fn).convert('RGB')
        if self.transform is not None:
            img = self.transform(img) 
        return img, label
 
    def __len__(self): 
        return len(self.imgs)


In [None]:
# training and testing preperation

# Munch processed image
MunchDir = "/kaggle/input/all-image/archive/munch_processed"
MunchFileName = np.array(os.listdir(MunchDir))

# Monet processed image
MonetDir = "/kaggle/input/all-image/archive/monet_processed"
MonetFileName = np.array(os.listdir(MonetDir))

# training set size
np.random.seed(100)
trainRatio = 0.7

trainMunchIdx = np.random.choice(len(MunchFileName), int(trainRatio * len(MunchFileName)), replace=False)
trainMonetIdx = np.random.choice(len(MonetFileName), int(trainRatio * len(MonetFileName)), replace=False)

trainMunch = MunchFileName[trainMunchIdx]
trainMonet = MonetFileName[trainMonetIdx]

testMunch = np.delete(MunchFileName, trainMunchIdx)
testMonet = np.delete(MonetFileName, trainMonetIdx)


## ResNet

In [None]:
# ResNet Model
# Refer to the model Provided by Kaiming, H., Xiangyu, Z. et al. Deep Residual Learning for Image Recognition(2015),https://arxiv.org/abs/1512.03385 

# Define basic ResNet Blocks: ConvBlock and IdentityBlock
class ConvBlock(nn.Module):
    '''
    The basic element of ResNet: Convolutional Block, including 3 convolution layers
    '''
    def __init__(self, in_channel:int, filters:list, strides = 1):
        '''
        :param in_channel: input dim
        :param filters: The number of filters for each concolutional layer (lenghth = 3).
        :param strides: the stride for the first layer, default 1.
        '''
        super(ConvBlock,self).__init__()
        F1, F2, F3 = filters
        self.stage = nn.Sequential(
            nn.Conv2d(in_channel,F1,1,stride=strides, padding=0, bias=False),
            nn.BatchNorm2d(F1),
            nn.ReLU(True),
            nn.Conv2d(F1,F2,3,stride=1, padding=1, bias=False),
            nn.BatchNorm2d(F2),
            nn.ReLU(True),
            nn.Conv2d(F2,F3,1,stride=1, padding=0, bias=False),
            nn.BatchNorm2d(F3),
        )
        self.shortcut_1 = nn.Conv2d(in_channel, F3, 1,stride = strides, padding=0, bias=False)
        self.batch_1 = nn.BatchNorm2d(F3)
        self.relu_1 = nn.ReLU(True)
        
    def forward(self, X):
        X_shortcut = self.shortcut_1(X)
        X_shortcut = self.batch_1(X_shortcut)
        X = self.stage(X)
        X = X + X_shortcut
        X = self.relu_1(X)
        return X    
    
class IndentityBlock(nn.Module):
    '''
    Simliar to ConvBlock, but works with ConvBlock as Bottle Neck.
    '''
    def __init__(self, in_channel:int, filters: list):
        '''
        :param in_channel: input dim
        :param filters: The number of filters for each concolutional layer (lenghth = 3).
        '''
        super(IndentityBlock,self).__init__()
        F1, F2, F3 = filters
        self.stage = nn.Sequential(
            nn.Conv2d(in_channel,F1,1,stride=1, bias=False),
            nn.BatchNorm2d(F1),
            nn.ReLU(True),
            nn.Conv2d(F1,F2,3,stride=1, padding=1, bias=False),
            nn.BatchNorm2d(F2),
            nn.ReLU(True),
            nn.Conv2d(F2,F3,1,stride=1, padding=0, bias=False),
            nn.BatchNorm2d(F3),
        )
        self.relu_1 = nn.ReLU(True)
        
    def forward(self, X):
        X_shortcut = X
        X = self.stage(X)
        X = X + X_shortcut
        X = self.relu_1(X)
        return X
    
class ResNet(nn.Module):
    '''
    ResNet
    '''
    def __init__(self, n_class: int):
        '''
        :param n_class: output dimension.
        '''
        super(ResNet,self).__init__()
        self.stage1 = nn.Sequential(
            nn.Conv2d(3,64,7,stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.MaxPool2d(3,2,padding=1),
        )
        self.stage2 = nn.Sequential(
            ConvBlock(64, filters=[64, 64, 256], strides=1),
            IndentityBlock(256, [64, 64, 256]),
            IndentityBlock(256, [64, 64, 256]),
        )
        self.stage3 = nn.Sequential(
            ConvBlock(256, filters=[128, 128, 512], strides=2),
            IndentityBlock(512, [128, 128, 512]),
            IndentityBlock(512, [128, 128, 512]),
            IndentityBlock(512, [128, 128, 512]),
        )
        self.stage4 = nn.Sequential(
            ConvBlock(512, filters=[256, 256, 1024], strides=2),
            IndentityBlock(1024, [256, 256, 1024]),
            IndentityBlock(1024, [256, 256, 1024]),
            IndentityBlock(1024, [256, 256, 1024]),
            IndentityBlock(1024, [256, 256, 1024]),
            IndentityBlock(1024, [256, 256, 1024]),
        )
        self.stage5 = nn.Sequential(
            ConvBlock(1024, filters=[512, 512, 2048], strides=2),
            IndentityBlock(2048, [512, 512, 2048]),
            IndentityBlock(2048, [512, 512, 2048]),
        )
        self.pool = nn.AvgPool2d(2,2,padding=1)
        self.fc = nn.Sequential(
            nn.Linear(2048*5*5,n_class)
        )
    
    def forward(self, X):
        out = self.stage1(X)
        out = self.stage2(out)
        out = self.stage3(out)
        out = self.stage4(out)
        out = self.stage5(out)
        out = self.pool(out)
        out = out.view(out.size(0),-1)
        out = self.fc(out)
        return out

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_size = 256
batchsz = 64
lr = 0.0002
epoches = 10

model = ResNet(2)
if torch.cuda.is_available():
    model.cuda()
optim = torch.optim.Adam(model.parameters(), lr=lr)
BCE_loss = torch.nn.CrossEntropyLoss()
# Load data

train_data = MyDataset(MunchDir, MonetDir, trainMunch, trainMonet, transform=transforms.ToTensor())
test_data = MyDataset(MunchDir, MonetDir, testMunch, testMonet, transform=transforms.ToTensor())
train_loader = DataLoader(dataset=train_data, batch_size=batchsz, shuffle=True)
test_loader = DataLoader(dataset=test_data, batch_size=batchsz, shuffle = False)

results = defaultdict(list)

for epoch in range(epoches):
    model.train()
    running_loss = 0.0
    running_acc = 0.0
    for batch_idx, batch_data in enumerate(train_loader,1):
        x, y = batch_data
        x = x.float().to(device)
        y = y.long().to(device)

        out = model(x)
        loss = BCE_loss(out, y)
        running_loss += loss.item() * y.size(0)
        _,pred = torch.max(out,1)  
        num_correct = (pred == y).sum()
        running_acc += num_correct.item()
        
        optim.zero_grad()
        loss.backward() 
        optim.step() 

    if (epoch+1) % 1 == 0:
        print('Train{} epoch, Loss: {:.6f},Acc: {:.6f}'.format(epoch+1,running_loss / (len(train_data)),
                                                               running_acc / (len(train_data))))
        model.eval()
        eval_loss = 0
        eval_acc = 0
        for data in test_loader:
            x,y = data
            x = x.float().to(device)
            y = y.long().to(device)
            out = model(x)
            loss = BCE_loss(out, y)
            eval_loss += loss.item() * y.size(0)
            _,pred = torch.max(out,1)   
            num_correct = (pred == y).sum() 
            eval_acc += num_correct.item() 

        print('Test Loss:{:.6f},Acc: {:.6f}'
            .format(eval_loss/ (len(test_data)),eval_acc * 1.0/(len(test_data))))
        
        results["train_acc"].append(running_acc / (len(train_data)))
        results["test_acc"].append(eval_acc * 1.0/(len(test_data)))
        results["train_loss"].append(running_loss / (len(train_data)))
        results["test_loss"].append(eval_loss/ (len(test_data)))
        torch.cuda.empty_cache()
                                     
plot(results)

## CNN

In [None]:
class CNN(torch.nn.Module):
    def __init__(self, linear_layer_size):
        super(CNN, self).__init__()
        self.linear_layer_size = linear_layer_size

        self.conv1 = torch.nn.Conv2d(3, 32, kernel_size=2)
        self.conv2 = torch.nn.Conv2d(32, 16, kernel_size=2)
        self.linear = torch.nn.Linear(linear_layer_size, 10)

    def forward(self, x):
        batch_size = x.size(0)

        relu = torch.nn.ReLU()
        maxpool = torch.nn.MaxPool2d(kernel_size=2)

        x = self.conv1(x)
        x = maxpool(relu(x))

        x = self.conv2(x)
        x = maxpool(relu(x))

        x = x.reshape(batch_size, self.linear_layer_size)
        x = self.linear(x)
        return x

In [None]:
def run_one_epoch(model, optimizer, X, y, train=True):

    if train:
        model.train()
        optimizer.zero_grad()
    else:
        model.eval()

    output = model(X).squeeze()
    acc = torch.sum(torch.argmax(output, dim=1) == y) / y.size(0)
    loss = torch.nn.CrossEntropyLoss()(output, y)

    if train:
        loss.backward()
        optimizer.step()

    # Detach tells torch to stop tracking a tensor's gradients
    return acc.detach(), loss.detach()

In [None]:
def print_model(model):
    """ 
    A simple functon that prints out a PyTorch model's structural details
    """
    # Print the number of parameters in the model
    parameter_count =  sum(p.numel() for p in model.parameters() if p.requires_grad)
    print("In total, this network has ", parameter_count, " parameters")

def demo(model, data, n_epochs=100, verbose=False):
    X_train, X_test, y_train, y_test = data
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    results = defaultdict(list)
    for i in range(n_epochs):
        train_acc, train_loss = run_one_epoch(model, optimizer, X_train, y_train, train=True)
        test_acc, test_loss = run_one_epoch(model, optimizer, X_test, y_test, train=False)

        results["train_acc"].append(train_acc)
        results["test_acc"].append(test_acc)
        results["train_loss"].append(train_loss)
        results["test_loss"].append(test_loss)

        if verbose and (i + 1) % (n_epochs // 10) == 0:
            train_stats = f"Train loss: {train_loss:.3f} Train accuracy: {100 * train_acc:4.1f}%"
            test_stats = f"Test loss: {test_loss:.3f} Test accuracy: {100 * test_acc:.1f}%"
            print(f"{i + 1:4d} {train_stats} {test_stats}")

    return results


def plot(results):    
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10, 5),
                             constrained_layout=True)

    ax = axes[0]
    ax.set_title("Loss per Epoch")
    train_loss = results["train_loss"]
    test_loss = results["test_loss"]
    n_epochs = len(train_loss)
    ax.plot(np.arange(n_epochs), train_loss, c='r', label='Train Loss')
    ax.plot(np.arange(n_epochs), test_loss, c='b', label='Test Loss')
    ax.legend(loc="best")
    ymin, ymax = ax.get_ylim()
    ax.set_ylim(ymin, 2 * ymax)

    # Bottom right
    ax = axes[1]
    ax.set_title("Accuracy per Epoch")
    train_acc = results["train_acc"]
    test_acc = results["test_acc"]
    n_epochs = len(train_acc)
    ax.plot(np.arange(n_epochs), train_acc, c='r', label='Train Acc')
    ax.plot(np.arange(n_epochs), test_acc, c='b', label='Test Acc')
    ax.legend(loc="best")
    ax.set_ylim(0, 1.1)

    plt.show()

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_size = 256
batchsz = 64
lr = 0.0002
epoches = 10

model = CNN(linear_layer_size=63504)
if torch.cuda.is_available():
    model.cuda()
optim = torch.optim.Adam(model.parameters(), lr=lr)
BCE_loss = torch.nn.CrossEntropyLoss()
# Load data

train_data = MyDataset(MunchDir, MonetDir, trainMunch, trainMonet, transform=transforms.ToTensor())
test_data = MyDataset(MunchDir, MonetDir, testMunch, testMonet, transform=transforms.ToTensor())
train_loader = DataLoader(dataset=train_data, batch_size=batchsz, shuffle=True)
test_loader = DataLoader(dataset=test_data, batch_size=batchsz, shuffle = False)

results = defaultdict(list)

for epoch in range(epoches):
    model.train()
    running_loss = 0.0
    running_acc = 0.0
    for batch_idx, batch_data in enumerate(train_loader,1):
        x, y = batch_data
        x = x.float().to(device)
        y = y.long().to(device)

        out = model(x)
        loss = BCE_loss(out, y)
        running_loss += loss.item() * y.size(0)
        _,pred = torch.max(out,1)  
        num_correct = (pred == y).sum()
        running_acc += num_correct.item()
        
        optim.zero_grad()
        loss.backward() 
        optim.step() 

    if (epoch+1) % 1 == 0:
        print('Train{} epoch, Loss: {:.6f},Acc: {:.6f}'.format(epoch+1,running_loss / (len(train_data)),
                                                               running_acc / (len(train_data))))
        model.eval()
        eval_loss = 0
        eval_acc = 0
        for data in test_loader:
            x,y = data
            x = x.float().to(device)
            y = y.long().to(device)
            out = model(x)
            loss = BCE_loss(out, y)
            eval_loss += loss.item() * y.size(0)
            _,pred = torch.max(out,1)   
            num_correct = (pred == y).sum() 
            eval_acc += num_correct.item() 

        print('Test Loss:{:.6f},Acc: {:.6f}'
            .format(eval_loss/ (len(test_data)),eval_acc * 1.0/(len(test_data))))
        
        results["train_acc"].append(running_acc / (len(train_data)))
        results["test_acc"].append(eval_acc * 1.0/(len(test_data)))
        results["train_loss"].append(running_loss / (len(train_data)))
        results["test_loss"].append(eval_loss/ (len(test_data)))
        torch.cuda.empty_cache()
                                     
plot(results)