In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
import os

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim

import torchvision

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

import cv2
from PIL import Image

import albumentations

In [None]:
# load data
base_path = '/kaggle/input/cassava-leaf-disease-classification'
train_data = 'train.csv'
train_imges_path = 'train_images'

In [None]:
dfx = pd.read_csv(os.path.join(base_path,train_data))
dfx.head()

In [None]:
dfx.label.value_counts()

In [None]:
# split training data
df_train, df_test = train_test_split(
    dfx,
    test_size=0.1,
    random_state = 42,
    stratify=dfx.label.values
)

df_train = df_train.reset_index(drop=True)
df_test = df_test.reset_index(drop=True)

In [None]:
df_train.shape

In [None]:
df_test.shape

In [None]:
# prepare train and validation image paths.
train_image_paths =[
    os.path.join(base_path,train_imges_path,img_name) for img_name in df_train.image_id.values
]

test_image_paths =[
    os.path.join(base_path,train_imges_path,img_name) for img_name in df_test.image_id.values
]

In [None]:
train_image_paths[:5]

In [None]:
train_targets = df_train.label.values
test_targets = df_test.label.values

**Data set preparation**

In [None]:
# Define custom Dataset preparation class.
class CustomImageDataset(Dataset):
    def __init__(
        self,
        image_paths,
        targets,
        augmentations=None,
        backend='pil',
        channel_first=True,
        gray_scale=False
    ):
         """
        :param image_paths: list of paths to images
        :param targets: numpy array
        :param augmentations: albumentations augmentations
        """
        self.image_paths= image_paths
        self.targets = targets
        self.augmentation = augmentations
        self.backend = backend
        self.channel_first= channel_first
        self.gray_scale = gray_scale
        
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self,item):
        targets = self.targets[item]
        if self.backend == 'pil':
            image = Image.open(self.image_paths[item])
            image = np.array(image)
            if self.augmentation is not None:
                augmented = self.augmentation(image = image)
                image = augmented['image']
        elif self.backend == 'cv2':
            image = cv2.imread(self.image_paths[item])
            if self.gray_scale is False:
                image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
            else:
                image = cv2.cvtColor(image,cv2.COLOR_BGR2GRAY)
            if self.augmentation is not None:
                augmented = self.augmentation(image = image)
                image = augmented['image']
        else:
            raise Exception('No backend defined')
            
        if self.channel_first is True and self.gray_scale is False:
            image = np.transpose(image,(2,0,1)).astype(np.float32)
        
        image_tensor = torch.tensor(image)
        if self.gray_scale:
            image_tensor = image_tensor.unsqueeze(0)
            
        return {
            "image": image_tensor,
            'targets': torch.tensor(targets)
        }
        

In [None]:
def plot_img(image_dict):
    img_tensor = image_dict['image']
    target = image_dict['targets']
    plt.figure(figsize=(5,5))
    img = img_tensor.permute(1,2,0)/255
    plt.imshow(img)

In [None]:
# Define data augumentation parameters.
data_agumnetation = [
        albumentations.RandomResizedCrop(180,180),
        albumentations.Transpose(p=0.5),
        albumentations.HorizontalFlip(p=0.5),
        albumentations.VerticalFlip(p=0.5)
    ]
train_aug = albumentations.Compose(data_agumnetation)
test_aug = albumentations.Compose(data_agumnetation)

In [None]:
# Prepare train and validation dataset
train_dataset = CustomImageDataset(
    image_paths = train_image_paths,
    targets = train_targets,
    backend ='pil',
    gray_scale =False,
    augmentations = train_aug
)

test_dataset = CustomImageDataset(
    image_paths = test_image_paths,
    targets = test_targets,
    backend ='pil',
    gray_scale =False,
    augmentations = test_aug
)

In [None]:
del train_aug
del test_aug

In [None]:
plot_img(train_dataset[np.random.randint(len(train_image_paths))])

In [None]:
plot_img(test_dataset[np.random.randint(len(test_image_paths))])

**Create and load model**

In [None]:
# Define Leaf desease classification model
# Using pretrained resnet model
class LeafDeseaseModel(nn.Module):
    def __init__(self,num_classes,pretrained = True):
        super().__init__()
        self.convnet = torchvision.models.resnet18(pretrained = pretrained)
        self.convnet.fc = nn.Linear(512,num_classes)
    
    def forward(self,image,targets=None):
        outputs = self.convnet(image)
        return outputs, None

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
model = LeafDeseaseModel(num_classes = dfx.label.nunique(),pretrained=True)
model = nn.DataParallel(model)
model = model.to(device)

In [None]:
img = train_dataset[0]['image'].to(device)
y = train_dataset[0]['targets'].to(device)
model(img.unsqueeze(0),y.unsqueeze(0))

In [None]:
# Training data loader
trainloader = DataLoader(train_dataset,batch_size=32)

**Train model**

In [None]:
# Start training model upto 10 epocs
def train_model(train_dl,mdl,epochs=10):
#     mdl.train()
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(mdl.parameters(), lr=1e-3)
    for epoch in range(epochs):
        for i,data in enumerate(train_dl):
            inputs,label = data['image'].to(device),data['targets'].to(device)
            optimizer.zero_grad()
            out = mdl(inputs)
            loss = loss_fn(out[0],label)
            loss.backward()
            optimizer.step()
#             break
    return mdl

model = train_model(trainloader,model)

In [None]:
# Test loader
testloader = DataLoader(test_dataset,batch_size=1)

**Evaluation**

In [None]:
# Evaluating validation data
def evaluate_model(test_dl,mdl):
    predictions,actuals = [],[]
    with torch.no_grad():
        for i, data in enumerate(test_dl):
            inputs,label = data['image'].to(device),data['targets'].to(device)
            yhat = mdl(inputs)
            yhat = yhat[0][0]
            yhat = torch.argmax(yhat).cpu().detach().numpy()
            targets = label.cpu().detach().numpy()
            predictions.append(yhat)
            actuals.append(targets)
    predictions, actuals = np.vstack(predictions), np.vstack(actuals)
    # calculate accuracy
    acc = accuracy_score(actuals, predictions)
    return acc

In [None]:
acc = evaluate_model(testloader, model)

In [None]:
acc

In [None]:
# Predict 
def predict(img, mdl):
    # make prediction
    yhat = mdl(img)
    # retrieve numpy array
    yhat = torch.argmax(yhat[0]).cpu().detach().numpy()
    return yhat

In [None]:
img = test_dataset[100]['image']
img = img.unsqueeze(0).to(device)
y = test_dataset[100]['targets']
yhat = predict(img,model)
print('Predicted class = %d' %  yhat)
print('Actual class = %d' %  y)