In [None]:
%matplotlib inline

In [None]:
#General python packages
from typing import List,Optional,Tuple,Union
import logging
from functools import partial
import pandas as pd
import numpy as np
import os
from PIL import Image
from matplotlib import pyplot as plt
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
import time
import copy

In [None]:
#Pytorch packages.
import torch
import torch.nn as nn
from torch.optim import Adam
from torchvision.models.resnet import BasicBlock
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision.models.resnet import ResNet
from torch import Tensor
from torchvision import transforms
from torch.autograd import Variable

In [None]:
# Path locations of image data.
DATA_FOLDER = '../input'
LABELS = f'{DATA_FOLDER}/train_labels.csv'
TRAIN_IMAGES_FOLDER = f'{DATA_FOLDER}/train'
TEST_IMAGES_FOLDER = f'{DATA_FOLDER}/test'
USE_GPU = torch.cuda.is_available()
cuda = torch.device('cuda') 

In [None]:
## Reading label file to know the structure.
train_labels=pd.read_csv(LABELS)
train_labels.head()

In [None]:
# train valid split
train_x,valid_x,train_y,valid_y=train_test_split(train_labels['id'],train_labels['label'],
                             test_size=0.2,shuffle=True,
                             random_state=9,stratify=train_labels['label'])


In [None]:
print('train x : ',train_x.shape)
print('train y : ',train_y.shape)

print('valid x : ',valid_x.shape)
print('valid y : ',valid_y.shape)
print('type : ',type(valid_x))

In [None]:
train_y,valid_y=train_y.values.reshape(-1,1),valid_y.values.reshape(-1,1)
train_x=[os.path.join(TRAIN_IMAGES_FOLDER, f'{f}.tif') for f in train_x.values]
valid_x=[os.path.join(TRAIN_IMAGES_FOLDER, f'{f}.tif') for f in valid_x.values]

In [None]:
def pil2tensor(image,dtype:np.dtype):
    "Convert PIL style `image` array to torch style image tensor."
    a = np.asarray(image)
    if a.ndim==2 : a = np.expand_dims(a,2)
    a = np.transpose(a, (1, 0, 2))
    a = np.transpose(a, (2, 1, 0))
    return torch.from_numpy(a.astype(dtype, copy=False) )

In [None]:
# Dataloading in pytorch is done by inheriting Dataset class , 
#Defining methods to create Dataset Objects
class ImageDataset(Dataset):
    def __init__(self,ImagePaths:List):
        self.ImagePaths=ImagePaths
    def __len__(self) -> int:
        return len(self.ImagePaths)
    def __getitem__(self,index:int) ->Image.Image:
        img=Image.open(self.ImagePaths[index])
        return pil2tensor(img,np.float32)
# Similarly creating for labels
class LabelDataset(Dataset):
    def __init__(self,labels:List):
        self.labels=labels
    def __len__(self) -> int:
        return len(self.labels)
    def __getitem__(self,index:int):
        return self.labels[index]
    
#Combining inputs and labels in a single pytorch Dataset for enabling dataloaders.
class VisionDataset(Dataset):
    def __init__(self,x:Dataset,y:Dataset):
        self.x=x
        self.y=y
    def __len__(self) ->int:
        return self.x.__len__()
    def __getitem__(self,index:int) ->Tuple:
        return (self.x[index],self.y[index])
    

In [None]:
# Creating Pytorch Datasets from Data
train_dataset = VisionDataset(ImageDataset(train_x), LabelDataset(train_y))
valid_dataset = VisionDataset(ImageDataset(valid_x), LabelDataset(valid_y))

In [None]:
# Creating Dataloaders for the Datasets to enable batch wise training
shuffle = True
batch_size = 256
num_workers = 0
train_dataloader = DataLoader(train_dataset, 
                              batch_size=batch_size, 
                              shuffle=shuffle, 
                              num_workers=num_workers)
valid_dataloader = DataLoader(valid_dataset, 
                              batch_size=batch_size, 
                              shuffle=False, 
                              num_workers=num_workers)

In [None]:
#Model Architecture to be used.
def resnet9(output_dim:int=1) ->nn.Module:
    model = ResNet(BasicBlock, [1, 1, 1, 1])
    in_features = model.fc.in_features
    model.avgpool=nn.AdaptiveAvgPool2d(1)
    model.fc=nn.Linear(in_features,output_dim)
    return model.to(cuda) if USE_GPU else model

In [None]:
resnet_basic=resnet9(output_dim=2)
resnet_basic

In [None]:
# Defining optimization and loss criteria
lr = 1e-3
optimizer = Adam(resnet_basic.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

In [None]:
def T(tensor):
    if not torch.is_tensor(tensor):
        tensor = torch.FloatTensor(tensor)
    else:
        tensor = tensor.type(torch.FloatTensor)
    return tensor

In [None]:
# train function for training the network
def train_model(model,dataloaders,criterion,optimizer,num_epochs=1):
    since=time.time()
    val_acc_history = []
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        for phase in ['train','val']:
            if phase=='train':
                model=model.train()
            else:
                model=model.eval()
            running_loss=0.0
            running_corrects=0
            for inputs,labels in dataloaders[phase]:
                inputs,labels=T(inputs).to(cuda),T(labels).to(cuda)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs=model(inputs)
                    loss = criterion(outputs, labels.squeeze(1).long())
                    _,preds=torch.max(outputs,1)
                    running_loss+=loss.item()*inputs.size(0)
                    running_corrects=running_corrects+torch.sum(preds==labels.squeeze(1).long())
                    if phase=='train':
                        loss=loss.backward()
                        optimizer.step()
            epoch_loss=running_loss/len(dataloaders[phase].dataset)
            epoch_acc=running_corrects.double()/len(dataloaders[phase].dataset)
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)
        print()
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [None]:
num_epochs=12
dataloaders_dict={'train':train_dataloader,'val':valid_dataloader}

In [None]:
model_ft, hist = train_model(resnet_basic, dataloaders_dict, criterion, optimizer,
                             num_epochs=num_epochs)

In [None]:
#Creating Test dataset and then Dataloader for predicting
test_x=[os.path.join(TEST_IMAGES_FOLDER, f'{f}') for f in 
        os.listdir(TEST_IMAGES_FOLDER)]
test_dataset=ImageDataset(test_x)
test_dataloader = DataLoader(test_dataset, 
                              batch_size=batch_size, 
                              shuffle=False, 
                              num_workers=num_workers)

In [None]:
def predictions(model:nn.Module,test_dataloader:DataLoader):
    total_preds=[]
    for x in test_dataloader:
        out=model(T(x).to(cuda))
        _,preds=torch.max(out,1)
        total_preds.append(preds)
    return torch.cat(total_preds).cpu().numpy()

In [None]:
predicts=predictions(model_ft,test_dataloader)

In [None]:
testfilenames=[os.path.basename(f).split('.')[0] for f in test_x]
submission=pd.DataFrame({'id':testfilenames,'label':predicts})
#submission.to_csv('submission.csv',row.names=False)

In [None]:
submission.to_csv('submission.csv',index=False)