In [78]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import KFold, StratifiedKFold
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,TensorDataset,random_split,SubsetRandomSampler, ConcatDataset
from torch.nn import functional as F
import torchvision
from torchvision import datasets,transforms
import torchvision.transforms as transforms

import albumentations as A
from albumentations.pytorch import ToTensorV2
import pandas as pd

import cv2


train_dataset = torchvision.datasets.MNIST('classifier_data', train=True, download=True)
test_dataset  = torchvision.datasets.MNIST('classifier_data', train=False, download=True)

transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor()
])

train_dataset.transform=transform
test_dataset.transform=transform

m=len(train_dataset)

In [79]:
class ConcatenatedDataset(Dataset):
    def __init__(
        self, annotations_file, img_dir, transform_positives=None, transform_background=None, target_transform=None
    ):
        self.img_labels = pd.read_csv(annotations_file, header=None)
        self.img_dir = img_dir
        self.transform_positives = transform_positives  
        self.transform_background = transform_background 
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        # image = Image.open(img_path)
        image = cv2.imread(img_path)
        
        # By default OpenCV uses BGR color space for color images,
        # so we need to convert the image to RGB color space.
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.img_labels.iloc[idx, 1]
        if label == 0 and self.transform_background:
            image = self.transform_background(image=image)["image"]
        if label != 0 and self.transform_positives:
            image = self.transform_positives(image=image)["image"]
        # if self.transform:
        #     image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [80]:
IMAGE_HEIGHT=224
IMAGE_WIDTH=224
transform = A.Compose([
        # A.Resize(height=IMAGE_HEIGHT, width=IMAGE_WIDTH),
        A.RandomCrop(height=IMAGE_HEIGHT, width=IMAGE_WIDTH, always_apply=True),
        A.Rotate(limit=35, p=1.0),
        # A.ToGray(always_apply=True),

        # A.Normalize(),
        ToTensorV2(),
    ])
whole_dataset = ConcatenatedDataset(annotations_file="imgs_merged/labels.csv", img_dir="imgs_merged", transform_background=transform, transform_positives=transform)

In [81]:
class ConvNet(nn.Module):
    def __init__(self,h1=96):
        # We optimize dropout rate in a convolutional neural network.
        super(ConvNet, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=5, stride=1, padding=2)

        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
        self.drop1=nn.Dropout2d(p=0.5)   

        self.fc1 = nn.LazyLinear(h1)
        self.drop2=nn.Dropout2d(p=0.1)

        self.fc2 = nn.Linear(h1, 3)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x),kernel_size = 2))

        x = F.relu(F.max_pool2d(self.conv2(x),kernel_size = 2))
        x = self.drop1(x)

        x = x.view(x.size(0),-1)

        x = F.relu(self.fc1(x))
        x = self.drop2(x)

        x = self.fc2(x)

        return x


In [87]:
class LeNet(torch.nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        # 1 input image channel (black & white), 6 output channels, 5x5 square convolution
        # kernel
        self.conv1 = torch.nn.Conv2d(3, 6, 5)
        self.conv2 = torch.nn.Conv2d(6, 16, 3)
        # an affine operation: y = Wx + b
        self.fc1 = torch.nn.LazyLinear(120)  # 6*6 from image dimension
        self.fc2 = torch.nn.Linear(120, 84)
        self.fc3 = torch.nn.Linear(84, 3)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        # If the size is a square you can only specify a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)

        x = x.view(x.size(0),-1)

        x = F.relu(self.fc1(x))

        x = F.relu(self.fc2(x))

        x = self.fc3(x)

        return x

In [88]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)
criterion = nn.CrossEntropyLoss()

dataset = ConcatDataset([train_dataset, test_dataset])

dataset = whole_dataset

num_epochs=20
batch_size=100
k=10
splits=KFold(n_splits=k,shuffle=True,random_state=42)
foldperf={}

In [89]:
def train_epoch(model,device,dataloader,loss_fn,optimizer):
    train_loss,train_correct=0.0,0
    model.train()
    for images, labels in dataloader:

        images,labels = images.to(device),labels.to(device)
        images = images.float()
        optimizer.zero_grad()
        
        output = model(images)
        # print("output:", output.size())
        # print("output:", output)

        # print("labels:", labels.size())
        # print("labels:", labels.float())
        
        loss = loss_fn(output,labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * images.size(0)
        scores, predictions = torch.max(output.data, 1)
        train_correct += (predictions == labels).sum().item()

    return train_loss,train_correct
  
def valid_epoch(model,device,dataloader,loss_fn):
    valid_loss, val_correct = 0.0, 0
    model.eval()
    for images, labels in dataloader:

        images,labels = images.to(device),labels.to(device)
        images = images.float()
        output = model(images)
        loss=loss_fn(output,labels)
        valid_loss+=loss.item()*images.size(0)
        scores, predictions = torch.max(output.data,1)
        val_correct+=(predictions == labels).sum().item()

    return valid_loss,val_correct


In [90]:
df = whole_dataset.img_labels.reset_index()
splitter = StratifiedKFold(n_splits=4, shuffle=True, random_state=0)

splits = []
for train_idx, test_idx in splitter.split(df["index"], df[1]):
    splits.append((train_idx, test_idx))

In [91]:
for fold, (train_idx, val_idx) in enumerate(splits):

    # print(train_idx, val_idx)
    print("Fold {}".format(fold + 1))

    train_sampler = SubsetRandomSampler(train_idx)
    test_sampler = SubsetRandomSampler(val_idx)
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model = ConvNet()
    model = LeNet()
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.002)

    history = {"train_loss": [], "test_loss": [], "train_acc": [], "test_acc": []}

    for epoch in range(num_epochs):
        train_loss, train_correct = train_epoch(
            model, device, train_loader, criterion, optimizer
        )
        test_loss, test_correct = valid_epoch(model, device, test_loader, criterion)

        train_loss = train_loss / len(train_loader.sampler)
        train_acc = train_correct / len(train_loader.sampler) * 100
        test_loss = test_loss / len(test_loader.sampler)
        test_acc = test_correct / len(test_loader.sampler) * 100

        print(
            "Epoch:{}/{} AVG Training Loss:{:.3f} AVG Test Loss:{:.3f} AVG Training Acc {:.2f} % AVG Test Acc {:.2f} %".format(
                epoch + 1, num_epochs, train_loss, test_loss, train_acc, test_acc
            )
        )
        history["train_loss"].append(train_loss)
        history["test_loss"].append(test_loss)
        history["train_acc"].append(train_acc)
        history["test_acc"].append(test_acc)

    foldperf["fold{}".format(fold + 1)] = history


Fold 1




Epoch:1/20 AVG Training Loss:3.450 AVG Test Loss:56.558 AVG Training Acc 22.22 % AVG Test Acc 33.33 %
Epoch:2/20 AVG Training Loss:50.500 AVG Test Loss:126.293 AVG Training Acc 31.75 % AVG Test Acc 42.86 %
Epoch:3/20 AVG Training Loss:119.114 AVG Test Loss:81.853 AVG Training Acc 46.03 % AVG Test Acc 42.86 %
Epoch:4/20 AVG Training Loss:72.547 AVG Test Loss:12.893 AVG Training Acc 46.03 % AVG Test Acc 42.86 %
Epoch:5/20 AVG Training Loss:12.607 AVG Test Loss:3.296 AVG Training Acc 46.03 % AVG Test Acc 23.81 %
Epoch:6/20 AVG Training Loss:3.343 AVG Test Loss:1.905 AVG Training Acc 22.22 % AVG Test Acc 47.62 %
Epoch:7/20 AVG Training Loss:1.985 AVG Test Loss:1.262 AVG Training Acc 44.44 % AVG Test Acc 33.33 %
Epoch:8/20 AVG Training Loss:1.306 AVG Test Loss:1.319 AVG Training Acc 26.98 % AVG Test Acc 52.38 %
Epoch:9/20 AVG Training Loss:1.292 AVG Test Loss:1.000 AVG Training Acc 55.56 % AVG Test Acc 76.19 %
Epoch:10/20 AVG Training Loss:1.062 AVG Test Loss:1.179 AVG Training Acc 69.84 % 

In [93]:
testl_f,tl_f,testa_f,ta_f=[],[],[],[]
k=4
for f in range(1,k+1):

     tl_f.append(np.mean(foldperf['fold{}'.format(f)]['train_loss']))
     testl_f.append(np.mean(foldperf['fold{}'.format(f)]['test_loss']))

     ta_f.append(np.mean(foldperf['fold{}'.format(f)]['train_acc']))
     testa_f.append(np.mean(foldperf['fold{}'.format(f)]['test_acc']))

print('Performance of {} fold cross validation'.format(k))
print("Average Training Loss: {:.3f} \t Average Test Loss: {:.3f} \t Average Training Acc: {:.2f} \t Average Test Acc: {:.2f}".format(np.mean(tl_f),np.mean(testl_f),np.mean(ta_f),np.mean(testa_f)))     


Performance of 4 fold cross validation
Average Training Loss: 9.583 	 Average Test Loss: 9.853 	 Average Training Acc: 63.39 	 Average Test Acc: 66.49
