# Import necessary libraries

In [1]:
import pandas as pd 
import random
import os
import cv2
import matplotlib.pyplot as plt
from PIL import Image
from pathlib import Path
import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

# Visualize and understand the data

In [2]:
data_path = Path("/kaggle/input/potato-diseases-datasets")

In [3]:
classes = os.listdir(data_path)
class_images = {}
for class_name in classes:
    class_images[class_name] = random.sample(os.listdir(os.path.join(data_path, class_name)), 5)

In [None]:
class_images


In [None]:
fig, ax = plt.subplots(7, 6, figsize=(16, 16))

for i, class_name in enumerate(class_images):
    # Display the class name before each row
    ax[i, 0].text(0.5, 0.5, class_name, fontsize=20, ha="center", va="center", transform=ax[i, 0].transAxes)
    ax[i, 0].axis(False)
    
    for j, img_file in enumerate(class_images[class_name]):
        image = cv2.imread(os.path.join(data_path, class_name, img_file))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        ax[i, j+1].imshow(image)
        ax[i, j+1].axis(False)

# Adjust layout for better spacing
plt.tight_layout()
plt.show()

In [None]:
# Check number of images in each class 
for i in os.listdir(data_path):
    length = len(os.listdir(data_path / i))
    
    print(f"{i}: {length} images")

In [7]:
data_transforms = transforms.Compose([
    transforms.RandomResizedCrop(size=(224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [8]:
data_path = Path("/kaggle/input/potato-diseases-datasets")

image_datasets = datasets.ImageFolder(root=data_path, 
                                      transform=data_transforms)

In [9]:
torch.manual_seed(42)
train_size = int(0.8 * len(image_datasets))
test_size = len(image_datasets) - train_size

train_datasets, test_datasets = torch.utils.data.random_split(dataset=image_datasets, lengths=[train_size, test_size])

In [None]:
next(iter(train_datasets))

In [11]:
class_names = image_datasets.classes

In [12]:
torch.manual_seed(42)

BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

train_dataloader = DataLoader(dataset=train_datasets,
                              batch_size=BATCH_SIZE,
                              shuffle=True,
                              num_workers=NUM_WORKERS)

test_dataloader = DataLoader(dataset=test_datasets,
                             batch_size=BATCH_SIZE,
                             shuffle=False,
                             num_workers=NUM_WORKERS)

# Create function to create dataloader

In [13]:
def create_dataloader(data_path, transform, batch_size: int, num_workers: int):
    
    image_datasets = datasets.ImageFolder(root=data_path, transform=transform)
    
    class_names = image_datasets.classes
    
    train_size = int(0.8 * len(image_datasets))
    test_size = len(image_datasets) - train_size
    
    train_datasets, test_datasets = torch.utils.data.random_split(dataset=image_datasets, 
                                                                  lengths=[train_size, test_size])
    
    train_dataloader = DataLoader(dataset=train_datasets,
                                  batch_size=batch_size,
                                  shuffle=True,
                                  num_workers=num_workers)

    test_dataloader = DataLoader(dataset=test_datasets,
                                 batch_size=batch_size,
                                 shuffle=False,
                                 num_workers=num_workers)
    
    return train_dataloader, test_dataloader, class_names

In [14]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

# Create train and test loop functions

In [15]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    for batch, (X, y) in enumerate(dataloader):
        # Put data to target device
        X, y = X.to(device), y.to(device)
        
        # 1. Forward pass
        y_pred = model(X)
        
        # 2. Calculate and accumulate loss 
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()
        
        # 3. Optimizer zero grad
        optimizer.zero_grad()
        
        # 4. Loss Backward
        loss.backward()
        
        # 5. Optimizer step
        optimizer.step()
        
        # Calculate and accumulate accuracy 
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item() / len(y_pred)
        
    # Adjust metrics to get average loss and accuracy per batch
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    return train_loss, train_acc


In [16]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    
    # Put model in eval mode
    model.eval()
    
    # Setup test_loss and test_acc values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through dataloader batches
        for batch, (X, y) in enumerate(dataloader): 
            # Send data to target device
            X, y = X.to(device), y.to(device)
            
            # 1. Forward pass
            test_pred_logits = model(X)
            
            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            
            # Calculate and accumulate accuracy
            test_pred_labels = torch.argmax(torch.softmax(test_pred_logits, dim=1), dim=1)
            test_acc += (test_pred_labels == y).sum().item() / len(test_pred_labels)
            
        # Adjust metric to get average loss, accuracy per batch
        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
        return test_loss, test_acc

In [17]:
from tqdm.auto import tqdm

def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = torch.nn.CrossEntropyLoss(),
          epochs: int=5):
    
    # Create empty results dictionary
    results = {"train_loss": [],
               "train_acc": [],
               "test_loss": [],
               "test_acc": []
              }
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        
        test_loss, test_acc = test_step(model=model,
                                        dataloader=test_dataloader,
                                        loss_fn=loss_fn)
        
        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

In [None]:
# Set random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Set numper of epochs, batch size and num_workers
NUM_EPOCHS = 100
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

# Resnet model
weights = torchvision.models.ResNet50_Weights.DEFAULT
resnet = torchvision.models.resnet50(weights=weights).to(device)

# Resnet transform
res_transforms = weights.transforms()

# Create dataset and dataloader
train_dataloader, test_dataloader, class_names = create_dataloader(data_path,
                                                                   transform=res_transforms,
                                                                   batch_size=BATCH_SIZE,
                                                                   num_workers=NUM_WORKERS)

# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=resnet.parameters(), lr=0.001)

# Start the timer
from timeit import default_timer as timer 
start_time = timer()

# Train model_0 
resnet_results = train(model=resnet, 
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn, 
                        epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

In [19]:
from typing import List, Dict, Tuple

def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values, e.g.
            {"train_loss": [...],
             "train_acc": [...],
             "test_loss": [...],
             "test_acc": [...]}
    """
    
    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot 
    plt.figure(figsize=(15, 7))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();

In [None]:
# plot the loss curves of resnet model
plot_loss_curves(resnet_results)