In [None]:
import pandas as pd
import numpy as np
from PIL import Image, ImageOps
import time
import cv2

import matplotlib.pyplot as plt
import matplotlib as mplt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as transforms
from torchvision import models

from sklearn import metrics
from sklearn.model_selection import train_test_split # for splitting the data into train and test samples
from sklearn.metrics import classification_report # for model evaluation metrics
from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix

In [None]:
def predict(model, path):
    with open(path, 'rb') as f:
        img = Image.open(f)
        gray_image = ImageOps.grayscale(img)
        data = np.array(gray_image)
        
        data = torch.tensor(data)
        data = data.unsqueeze(0) # (28, 28) -> (1, 28, 28) [add batch]

    output = model(data.float())
    print(output)
    result = torch.argmax(output)
    return result

In [None]:
def get_accuracy(batch, output, label):
    y_pred = []
    for i in range(batch):
        o = list(output[i])
        idx = o.index(max(o))
        y_pred.append(idx)
    y_true = list(label)
    return accuracy_score(y_true, y_pred)

In [None]:
def test(model, dataloader, GPU):        
    for i, (data, label) in enumerate(dataloader):

        if GPU:
            data, label = data.cuda(), label.cuda()

        # Forward prop.
        output = model(data.float()) 
        # Get loss & accuracy
        loss = criterion(output, label)
        this_epoch_accuracy += get_accuracy(output.shape[0], output.cpu(), label.cpu())
        this_epoch_loss += loss
    
    end = time.time()
    print('[*] Epoch {}: loss = {:.4f}, accuracy = {:.4f}, time = {:.1f}s'.format(epoch+1, this_epoch_loss, this_epoch_accuracy/(i+1), end-start))

    accuracy = accuracy_score(y_true, y_pred)
    matrix = confusion_matrix(y_true, y_pred)
    report = classification_report(y_true, y_pred)
    return accuracy, matrix, report

In [None]:
class CNN(nn.Module):
    def __init__(self): 
        super(CNN, self).__init__()
        self.class_num = 10
        self.input_size = 28
        
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1, padding=0), # (1, 28, 28) -> (16, 24, 24)
            nn.BatchNorm2d(16),
            nn.Dropout2d(p=0.1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)) # (16, 24, 24) -> (16, 12, 12)
        
        self.block2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=0), # (16, 12, 12) -> (32, 8, 8)
            nn.BatchNorm2d(32),
            nn.Dropout2d(p=0.1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)) # (32, 8, 8) -> (32, 4, 4)

        self.fc = nn.Linear(32 * 4 * 4, self.class_num)

    def forward(self, x):
        out = self.block1(x)
        out = self.block2(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out) # (batch, 32 * 4 * 4) -> (batch, 10)

        return out

In [None]:
class Ensemble(nn.Module):
    def __init__(self, models): 
        super(Ensemble, self).__init__()
        self.class_num = 10
        self.input_size = 28
        
        self.models = models
        self.fc = nn.Linear(self.class_num * len(models), self.class_num)

    def forward(self, x):
        outs = [model(x) for model in self.models]
        out = torch.cat(outs, dim=1)
        out = self.fc(out) # (batch, 32 * 4 * 4) -> (batch, 10)

        return out

In [None]:
# %load dataset.py 
import torch.utils.data as data

from PIL import Image

import os
import os.path

IMG_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif']

def has_file_allowed_extension(filename, extensions):
    """Checks if a file is an allowed extension.

    Args:
        filename (string): path to a file

    Returns:
        bool: True if the filename ends with a known image extension
    """
    filename_lower = filename.lower()
    return any(filename_lower.endswith(ext) for ext in extensions)


def make_dataset(dir):
    images = []
    for root, _, fnames in sorted(os.walk(dir)):
        for fname in sorted(fnames):
            path = os.path.join(root, fname)
            img = pil_loader(path)
            class_num = root.split(dir)[1][1]
            item = (img, int(class_num))
            images.append(item)

    return images


class DatasetFolder(data.Dataset):
    def __init__(self, root, transform=None, target_transform=None):
        # classes, class_to_idx = find_classes(root)
        samples = make_dataset(root)
        if len(samples) == 0:
            raise(RuntimeError("Found 0 files in subfolders of: " + root + "\n"
                               "Supported extensions are: " + ",".join(extensions)))

        self.root = root
        self.samples = samples

        self.transform = transforms.ToTensor()
        self.target_transform = target_transform

    def __getitem__(self, index):
        """
        Args:
            index (int): Index

        Returns:
            tuple: (sample, target) where target is class_index of the target class.
        """
        sample, target = self.samples[index]
        #sample = pil_loader(path)
        #sample = torch.tensor(sample)
        if self.transform is not None:
            sample = self.transform(sample)
        if self.target_transform is not None:
            target = self.target_transform(target)

        return sample, target

    def __len__(self):
        return len(self.samples)

def pil_loader(path):
    img = cv2.imread(path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    return img

In [None]:
train_on_GPU = False

if torch.cuda.is_available():
    train_on_GPU = True
    my_device = torch.device('cuda')
else:
    my_device = torch.device('cpu')
print('using device: {}'.format(my_device))

In [None]:
""" Dataset Construction """
model_num = 3
batch_size = 512
train_dir = []
train_dir.append('../input/2022-cv-final/training data/training data')
train_dir.append('../input/2022-cv-final/training data 2/training data 2')
train_dir.append('../input/2022-cv-final/training data 3/training data 3')
valid_dir = '../input/2022-cv-final/validation data/validation data'

Dataloaders = []
for i in range(model_num):
    Dataloaders.append(DataLoader(dataset=DatasetFolder(train_dir[i]), batch_size=batch_size, shuffle=True))
    print('Dataset {} complete!'.format(i))
#myDataloader = DataLoader(dataset=DatasetFolder(root_dir), batch_size=batch_size, shuffle=True)
validDataloader = DataLoader(dataset=DatasetFolder(valid_dir), batch_size=128, shuffle=True)

In [None]:
""" Model Construction """
models = []
optims = []
for i in range(model_num):
    models.append(CNN().float())
    optims.append(torch.optim.Adam(models[i].parameters(), lr=0.002))

criterion = nn.CrossEntropyLoss()

In [None]:
""" Training """
# Set hyperparameters
epochs_num = 40
save_freq = 10
valid_freq = 5
load = False

for m_idx in range(model_num):
    
    if train_on_GPU:
        models[m_idx] = models[m_idx].cuda()
        
    best_accuracy = 0
    best_param = {}
    
    ### Load pretrained model ###
    if(load): 
        models[m_idx].load_state_dict(torch.load('../input/ml-hw5-train/k_model_weights_300.pth'))
        print('Load model success!')

    ### Main training process ###
    print("-----Start training!-----")
    start = time.time()
    for epoch in range(epochs_num):
        this_epoch_loss = 0
        this_epoch_accuracy = 0

        for i, (data, label) in enumerate(Dataloaders[0]):

            if train_on_GPU:
                data, label = data.cuda(), label.cuda()

            # Clear gradient
            optims[m_idx].zero_grad()

            # Forward prop.
            output = models[m_idx](data.float()) 

            # Get loss & accuracy
            loss = criterion(output, label)
            this_epoch_accuracy += get_accuracy(output.shape[0], output.cpu(), label.cpu())
            this_epoch_loss += loss

            # Backward prop.
            loss.backward()

            # Update parameters
            optims[m_idx].step()

        end = time.time()
        print('[*] Epoch {}: loss = {:.4f}, accuracy = {:.4f}, time = {:.1f}s'.format(epoch+1, this_epoch_loss, this_epoch_accuracy/(i+1), end-start))

        ### Validation ###
        if ((epoch+1) % valid_freq == 0):
            valid_loss = 0
            valid_accuracy = 0
            for i, (data, label) in enumerate(validDataloader):

                if train_on_GPU:
                    data, label = data.cuda(), label.cuda()

                output = models[m_idx](data.float()) 
                loss = criterion(output, label)
                valid_accuracy += get_accuracy(output.shape[0], output.cpu(), label.cpu())
                valid_loss += loss

            end = time.time()
            print('[-] Validation: loss = {:.4f}, accuracy = {:.4f}, time = {:.1f}s'.format(valid_loss, valid_accuracy/(i+1), end-start))
            
            if(valid_accuracy/(i+1) > best_accuracy):
                best_accuracy = valid_accuracy/(i+1)
                best_param = models[m_idx].state_dict()
                # save
                name = 'model_' + str(m_idx) + '_weights' + '.pth'
                torch.save(models[m_idx].state_dict(), name)
                print('Save model success!')

        """### Save model ###
        if ((epoch+1) % save_freq == 0):
            name = 'model_weights_' + str(epoch+1) + '.pth'
            torch.save(TrainModel.state_dict(), name)
            print('Save model success!')"""
        
    ### Choose best param ###
    models[m_idx].load_state_dict(best_param)
    print('-------- Model {} training complete! --------'.format(m_idx))

In [None]:
""" Ensemble Model Construction """
MainModel = Ensemble(models).float()
optimizer = torch.optim.Adam(MainModel.parameters(), lr=0.002)
criterion = nn.CrossEntropyLoss()

In [None]:
""" Training """
# Set hyperparameters
epochs_num = 40
save_freq = 10
valid_freq = 5
load = False

if train_on_GPU:
    MainModel = MainModel.cuda()

best_accuracy = 0
best_param = {}

### Load pretrained model ###
if(load): 
    MainModel.load_state_dict(torch.load('../input/ml-hw5-train/k_model_weights_300.pth'))
    print('Load model success!')

### Main training process ###
print("-----Start training!-----")
start = time.time()
for epoch in range(epochs_num):
    this_epoch_loss = 0
    this_epoch_accuracy = 0

    for i, (data, label) in enumerate(Dataloaders[0]):

        if train_on_GPU:
            data, label = data.cuda(), label.cuda()

        # Clear gradient
        optimizer.zero_grad()

        # Forward prop.
        output = MainModel(data.float()) 

        # Get loss & accuracy
        loss = criterion(output, label)
        this_epoch_accuracy += get_accuracy(output.shape[0], output.cpu(), label.cpu())
        this_epoch_loss += loss

        # Backward prop.
        loss.backward()

        # Update parameters
        optimizer.step()

    end = time.time()
    print('[*] Epoch {}: loss = {:.4f}, accuracy = {:.4f}, time = {:.1f}s'.format(epoch+1, this_epoch_loss, this_epoch_accuracy/(i+1), end-start))

    ### Validation ###
    if ((epoch+1) % valid_freq == 0):
        valid_loss = 0
        valid_accuracy = 0
        for i, (data, label) in enumerate(validDataloader):

            if train_on_GPU:
                data, label = data.cuda(), label.cuda()

            output = MainModel(data) 
            loss = criterion(output, label)
            valid_accuracy += get_accuracy(output.shape[0], output.cpu(), label.cpu())
            valid_loss += loss

        end = time.time()
        print('[-] Validation: loss = {:.4f}, accuracy = {:.4f}, time = {:.1f}s'.format(valid_loss, valid_accuracy/(i+1), end-start))

        if(valid_accuracy/(i+1) > best_accuracy):
            best_accuracy = valid_accuracy/(i+1)
            best_param = MainModel.state_dict()
            # save
            name = 'model_main_weights.pth'
            torch.save(MainModel.state_dict(), name)
            print('Save model success!')

    """### Save model ###
    if ((epoch+1) % save_freq == 0):
        name = 'model_weights_' + str(epoch+1) + '.pth'
        torch.save(TrainModel.state_dict(), name)
        print('Save model success!')"""
        
### Choose best param ###
MainModel.load_state_dict(best_param)
print('-------- Main Model training complete! --------')