# Importing necessary libraries.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


import torch
import torchvision
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler

# Checking out if GPU is working fine.

In [None]:
if(torch.cuda.is_available()):
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print(device)

# Transformation.

Transforming the input images into tensor and using some basic data augmentation techniques.

In [None]:
stats = ((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))

transform = transforms.Compose([
    transforms.Resize((120,120)),
    transforms.ColorJitter(0.05),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.ToTensor(),
    transforms.Normalize(*stats, inplace=True)
])

In [None]:
input_dir = '/kaggle/input/cell-images-for-detecting-malaria/cell_images/cell_images'
train_set = ImageFolder(input_dir, transform=transform)

In [None]:
print(len(train_set))

# Randomly splitting the dataset into train and test.

In [None]:
test_size = 0.2

num_train = len(train_set)
indices = list(range(num_train))
np.random.shuffle(indices)

test_split = int(np.floor((test_size) * num_train))
test_index, train_index = indices[:test_split - 1], indices[test_split - 1:]

train_sampler = SubsetRandomSampler(train_index)
test_sampler = SubsetRandomSampler(test_index)

train_loader = DataLoader(train_set, sampler=train_sampler, batch_size=128)
test_loader = DataLoader(train_set, sampler=test_sampler, batch_size=64)
print("Images in Test set: {}\nImages in Train set: {}".format(len(test_index), len(train_index)))

In [None]:
classes=['infected','uninfected']

# Helper function to visualize our inputs.

In [None]:
def imshow(img):
    img = img / 2 + 0.5  # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    
images, labels = next(iter(train_loader))

fig = plt.figure(figsize=(25, 15))

for i in range(10):
    ax = fig.add_subplot(2, 5, i+1, xticks=[], yticks=[], title=classes[labels[i]])
    imshow(images[i])
plt.show()

# Defining the base class for our model.

1. train_step : Takes a batch of input and calculates the loss and returns it.
2. validation_step : Takes a batch of input, calculates the loss and also the accuracy and stores both of them in a python dictionary.
3. validation_epoch_end : Takes the output after an epoch, returns the average loss and average accuracy.
4. epoch_end : Just a basic function. We will use this later to print the losses and accuracy after the end of each epoch.

In [None]:
#Defining a helper function which will help us to measure accuracy.
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

class MalariaCellDetectBase(nn.Module):
    def train_step(self, batch):
        images, labels = batch
        out = self(images)
        loss = F.cross_entropy(out, labels)
        return loss
    
    def validation_step(self, batch):
        images, labels = batch
        out = self(images)
        loss = F.cross_entropy(out, labels)
        acc = accuracy(out, labels)
        return {'val_loss': loss.detach(), 'val_acc': acc}
    
    def validation_epoch_end(self, outputs):
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(epoch, result['train_loss'], result['val_loss'], result['val_acc']))

# Creating our model.

In [None]:
class MalariaNet(MalariaCellDetectBase):
    
    def __init__(self):
        super(MalariaNet, self).__init__()
        
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  
        )
        
        self.flatten = nn.Flatten()
        
        self.fc1 = nn.Linear(64*15*15, 512)
        self.fc2 = nn.Linear(512, 128)
        self.fc3 = nn.Linear(128, 2)
        self.drop = nn.Dropout2d(0.2)
            
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        
        out = self.flatten(out)
        
        out = self.fc1(out)
        out = F.relu(out)
        out = self.drop(out)
        
        out = self.fc2(out)
        out = F.relu(out)
        out = self.drop(out)
        
        out = self.fc3(out)
        
        return out       

In [None]:
model = MalariaNet()
model

# Checking out if the input and output dims are as we expected.

In [None]:
for images, labels in train_loader:
    print(f'shape of images: {images.shape}')
    out = model(images)
    print(f'shape of output: {out.shape}')
    print(f'prediction of first image {out[0]}')
    break

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device

# Moving the model and data to cuda.

In [None]:
train_loader = DeviceDataLoader(train_loader, device)
test_loader = DeviceDataLoader(test_loader, device)
to_device(model, device)

# Evaluating our model.

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval()
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, val_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr)
    for epoch in range(epochs):
        model.train()
        train_losses = []
        for batch in train_loader:
            loss = model.train_step(batch)
            train_losses.append(loss)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
        result = evaluate(model, val_loader)
        result['train_loss'] = torch.stack(train_losses).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [None]:
model = to_device(MalariaNet(), device)

# Checking our accuracy just after creating the model.

We are doing this step so that we can check how well our model improved in the course of time.

In [None]:
evaluate(model, test_loader)

# Defining the optimizer and learning rate.

In [None]:
learning_rate = 0.0001
optimizer = torch.optim.Adam
epochs = 20

In [None]:
history = fit(epochs, learning_rate, model, train_loader, test_loader, opt_func=optimizer)

In [None]:
def plot_accuracies(history):
    accuracies = [x['val_acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.title('Accuracy vs. No. of epochs')

In [None]:
plot_accuracies(history)

In [None]:
def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss vs. No. of epochs');

In [None]:
plot_losses(history)