In [None]:
!pip install mlflow torchinfo torch torchvision torchsummary opencv-python tqdm Pillow scikit-learn

## Import lib

In [None]:
# Basic data manipulations
import pandas as pd
import numpy as np


# Handling images
from PIL import Image
import matplotlib.pyplot as plt

# Handling paths

import time

# Pytorch essentials
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import models
from torchvision.datasets import ImageFolder
import torchsummary



# Pytorch essentials for datasets.
from torch.utils.data.sampler import SubsetRandomSampler
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

# Pytorch way of data augmentation.
import torchvision
from torchvision import datasets, models, transforms, utils
from torchvision.transforms import v2

import cv2
import os
from glob import glob
from tqdm import tqdm
import shutil
from sklearn.model_selection import train_test_split

#PyTorch Monitoring to MLflow
import mlflow

Accessing the Dataset from Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
root_path = '/content/drive/MyDrive/SDS-myconet/DeFungi/'
#root_path = 'E:\SDS\SDS-CP031-myconet\DeFungi'

## Create dataframe

In [None]:
#root_path = '/kaggle/input/defungi/'
df = pd.DataFrame({"path":[],"label":[], "class_id":[]})
label_dict = {
    "H1":0,
    "H2":1,
    "H3":2,
    "H5":3,
    "H6":4,
}
for key in label_dict:
    img_path = os.path.join(root_path, key)
    jpg_path_list = glob(img_path+'/*.jpg')
    for jpg_path in jpg_path_list:
        new_data_frame =pd.DataFrame({"path":jpg_path,"label":key, "class_id":label_dict[key]}, index=[1])
        df = pd.concat([df, new_data_frame], ignore_index=True)

df[["path"]] = df[["path"]].astype(str)
df[["label"]] = df[["label"]].astype(str)
df[["class_id"]] = df[["class_id"]].astype(int)

Display DataFrame

In [None]:
df.head()

##Show Class Distribution

**Class distribution in histogram**

In [None]:
plt.hist(df['label'])

**Handling Class Imbalance**

Since our dataset is imbalanced and we want to use a weighted loss function, we will use the weight argument in PyTorch's loss functions like nn.CrossEntropyLoss. This allows us to assign different weights to the loss for each class, giving more importance to the minority classes.


In [None]:
# Calculate class frequencies
class_counts = df['class_id'].value_counts().sort_index()
total_samples = len(df)
class_weights = total_samples / (len(class_counts) * class_counts)

# Convert to a PyTorch tensor and move to the device
device = torch.device("cuda" if torch.cuda.is_available else "cpu")
class_weights_tensor = torch.tensor(class_weights.values, dtype=torch.float32).to(device)
print("Class Weights:", class_weights_tensor)

## Display sample images from Dataset

In [None]:
show_imgs = 15
idx = np.random.randint(0, len(df),size=show_imgs)
fig, axes = plt.subplots(show_imgs//5, 5, figsize=(15,10))
axes = axes.flatten()
for i, ax in enumerate(axes):
    full_path = df.loc[idx[i]]['path']
    ax.imshow(plt.imread(full_path))
    ax.set_title(df.loc[idx[i]]['label'])
    ax.set_axis_off()

## Create Image Transformations

**Image Transformations Explained**
The code defines three sets of transformations using torchvision.transforms.v2.Compose:

train_transforms: These transformations are applied specifically to the images used for training the model. They include several data augmentation techniques to help the model generalize better and prevent overfitting.

eval_transforms: These transformations are for the validation set, which is used during training to evaluate the model's performance on data it hasn't seen before. Data augmentation is typically not applied here, only transformations needed to get the images into the correct format and size for the model.

test_transforms: These transformations are for the test set, which is used for a final evaluation of the trained model. Like the validation set, data augmentation is not applied here.

In [None]:
train_transforms = v2.Compose([
 v2.Resize(256),
    v2.RandomResizedCrop(size=(224, 224), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.RandomVerticalFlip(p=0.5),
    # v2.RandomRotation(degrees=(-20, 20)),
    v2.RandomAffine(degrees=(-10, 10), translate=(0.1, 0.1), scale=(0.9, 1.1)),
    v2.RandomErasing(p=0.5, scale=(0.1,0.15)),
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

eval_transforms = v2.Compose([
    v2.Resize((224,224)), # Resize to 224x224 (no cropping for evaluation)
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

test_transforms = v2.Compose([
    v2.Resize((224,224)), # Resize to 224x224 (no cropping for testing)
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

**Define Custom Dataset**

This code defines a custom dataset class called MyDataset which inherits from PyTorch's Dataset class [1]


In [None]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, transforms_):
        self.df = dataframe
        # We'll use transforms for data augmentation and converting PIL images to torch tensors.
        self.transforms_ = transforms_

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        image_path = self.df.iloc[index]['path']
        # img = Image.open(image_path).convert("LA")
        img = Image.open(image_path).convert("RGB")
        # img = Image.open(image_path)
        transformed_img = self.transforms_(img)
        class_id = self.df.iloc[index]['class_id']
        return transformed_img, class_id

**Create Dataset and Dataloader**

We use train_test_split twice. First, to split into training and a combined validation/test set, and then again to split the combined validation/test set into separate validation and test sets.

In [None]:
## Create dataset and dataloader

# Assuming 'df' dataframe is already created and contains 'label' column for stratification

# First split: 80% for training, 20% for combined validation and testing
train_df, val_test_df = train_test_split(
    df,
    test_size=0.2,
    random_state=9898,
    stratify=df['label'] # Stratify based on the 'label' column
)

# Second split: Split the 20% (val_test_df) into 10% for validation and 10% for testing
# Since val_test_df is 20% of the original data, splitting it 50/50 will give 10% each
val_df, test_df = train_test_split(
    val_test_df,
    test_size=0.5, # 0.5 of the 20% is 10% of the original data
    random_state=9898,
    stratify=val_test_df['label'] # Stratify this split as well
)


device = torch.device("cuda" if torch.cuda.is_available else "cpu")
# num_workers can be adjusted based on your system
num_workers = 2 if device=='cuda' else 2 # Example workers setting

# Create datasets for each split
train_dataset = MyDataset(train_df, train_transforms)
val_dataset = MyDataset(val_df, eval_transforms) # Using eval_transforms for validation
test_dataset = MyDataset(test_df, test_transforms) # Using test_transforms for testing


BATCH_SIZE = 32

# Create DataLoaders for each dataset
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=num_workers) # Test loader

In [None]:
print(f'train data:{len(train_df)}')
print(f'val data:{len(val_df)}')
print(f'test data:{len(test_df)}')

## Create model

Define the custom CNN

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=5):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        # The fully connected layer will be defined after we determine the input size
        self.fc = None # Initialize as None

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.conv2(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = x.view(x.size(0), -1) # Flatten the tensor
        # The fully connected layer is applied here
        x = self.fc(x)
        return x

## Initialize the Model

In [None]:
class_size = 5
model = SimpleCNN(num_classes=class_size)

# Determine the input size for the fully connected layer
# We will pass a dummy tensor through the convolutional and pooling layers
# to see the output shape before the flattening step.
dummy_input = torch.randn((16, 3, 224, 224)) # Use the expected input size after transforms

# Pass the dummy input through the conv and pooling layers
with torch.no_grad(): # We don't need to calculate gradients for this
    x = model.conv1(dummy_input)
    x = model.relu(x)
    x = model.maxpool(x)
    x = model.conv2(x)
    x = model.relu(x)
    x = model.maxpool(x)
    flattened_size = x.view(x.size(0), -1).size(1) # Get the size after flattening



In [None]:
x.shape

In [None]:
print(f'Flattened size: {flattened_size}')

In [None]:
# Now define the fully connected layer with the determined input size
model.fc = nn.Linear(flattened_size, class_size)
model

## Training and Validation

In [None]:
def train(dataloader, model, loss_fn, optimizer, lr_scheduler):
    size = len(dataloader.dataset) # number of samples
    num_batches = len(dataloader) # batches per epoch
    model.train()
    epoch_loss = 0.0
    epoch_correct = 0
    for (data_,target_) in dataloader:
        target_ = target_.type(torch.LongTensor)
        data_, target_ = data_.to(device), target_.to(device)

        # First we'll clean the cache of optimizer
        optimizer.zero_grad()

        # Forward propagation
        outputs = model(data_)

        # Computing loss
        loss = criterion(outputs,target_)

        # Backward propagation
        loss.backward()

        # Optimizing model
        optimizer.step()

        # Computing statistics.
        epoch_loss = epoch_loss + loss.item()
        _,pred = torch.max(outputs,dim=1)
        epoch_correct = epoch_correct + torch.sum(pred == target_).item()
    lr_scheduler.step()
    return epoch_correct/size, epoch_loss/num_batches


def eval(dataloader, model, loss_fn):
    size = len(dataloader.dataset) # number of samples
    num_batches = len(dataloader) # batches per epoch
    epoch_loss = 0.0
    epoch_correct = 0
    with torch.no_grad():
        # This will disable backward propagation
        model.eval()
        for (data_,target_) in dataloader:
            target_ = target_.type(torch.LongTensor)
            data_, target_ = data_.to(device), target_.to(device)

            # Forward propagation
            outputs = model(data_)

            # Computing loss
            loss = criterion(outputs,target_)
            # Computing statistics.
            epoch_loss = epoch_loss + loss.item()
            _,pred = torch.max(outputs,dim=1)
            epoch_correct = epoch_correct + torch.sum(pred == target_).item()
    return  epoch_correct/size, epoch_loss/num_batches

def test(dataloader, model, loss_fn):
      size = len(dataloader.dataset)
      num_batches = len(dataloader)
      model.eval() # Set the model to evaluation mode
      test_loss = 0.0
      correct = 0
      all_labels = []
      all_predictions = []

      with torch.no_grad():
        for data, target in dataloader:
            target = target.type(torch.LongTensor)
            data, target = data.to(device), target.to(device)

            # Forward pass
            outputs = model(data)
            loss = loss_fn(outputs, target)

            test_loss += loss.item()

            # Get predictions
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == target).sum().item()

            # Store labels and predictions for metrics calculation
            all_labels.extend(target.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

      test_loss /= num_batches
      accuracy = correct / size

      # Calculate additional metrics
      from sklearn.metrics import f1_score, recall_score, confusion_matrix

      f1 = f1_score(all_labels, all_predictions, average='weighted') # Use weighted average for imbalance
      recall = recall_score(all_labels, all_predictions, average='weighted') # Use weighted average for imbalance
      cm = confusion_matrix(all_labels, all_predictions)

      return accuracy, test_loss, f1, recall, cm

In [None]:
model.to(device)
EPOCHS = 50

logs = {
    'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []
}

#Initialize the loss function and apply the class weights:
#Pass the class_weights_tensor to the weight argument of the torch.nn.CrossEntropyLoss function.

criterion  = nn.CrossEntropyLoss(weight=class_weights_tensor)


# Optimizer which will use gradients to train model.
learning_rate = 0.0001
momentum = 0.9
weight_decay = 0.1
# optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-08, weight_decay=weight_decay, amsgrad=False)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
# optimizer = torch.optim.Adam(model.parameters(), lr=0.0007)
lr_milestones = [7, 14, 21, 28, 35]
multi_step_lr_scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=lr_milestones, gamma=0.1)

# Earlystopping
patience = 5
counter = 0
best_loss = np.inf

# Start the main MLflow run that covers the entire training and evaluation process
with mlflow.start_run():
    # Log parameters at the beginning of the run
    mlflow.log_params({
        "epochs": EPOCHS,
        "learning_rate": learning_rate,
        "optimizer": type(optimizer).__name__,
        "loss_function": type(criterion).__name__,
        "batch_size": BATCH_SIZE,
        "device": str(device),
        "lr_milestones": lr_milestones,
        "lr_gamma": 0.1,
        "early_stopping_patience": patience
        # Add any other relevant parameters you've defined
    })

    # Log model architecture as an artifact
    model_summary_file = "model_summary.txt"
    with open(model_summary_file, "w") as f:
        f.write(str(torchsummary.summary(model, input_size=(3, 224, 224), device=str(device))))
    mlflow.log_artifact(model_summary_file)
    os.remove(model_summary_file) # Clean up the temporary file


    for epoch in tqdm(range(EPOCHS)):
        train_acc, train_loss,  = train(train_loader, model, criterion, optimizer, multi_step_lr_scheduler)
        val_acc, val_loss = eval(val_loader, model, criterion)
        print(f'EPOCH: {epoch} \
        train_loss: {train_loss:.4f}, train_acc: {train_acc:.3f} \
        val_loss: {val_loss:.4f}, val_acc: {val_acc:.3f} \
        Learning Rate: {optimizer.param_groups[0]["lr"]}')

        logs['train_loss'].append(train_loss)
        logs['train_acc'].append(train_acc)
        logs['val_loss'].append(val_loss)
        logs['val_acc'].append(val_acc)

        # Log metrics for each epoch within the same run
        mlflow.log_metrics({
                "train_loss": train_loss,
                "train_accuracy": train_acc,
                "val_loss": val_loss,
                "val_accuracy": val_acc,
                "learning_rate": optimizer.param_groups[0]["lr"]
            }, step=epoch)

        # Save model checkpoints
        torch.save(model.state_dict(), "last.pth")

        # Log checkpoint every few epochs (e.g., every 10 epochs) as an artifact (optional)
        if (epoch + 1) % 1 == 0:
            checkpoint_path = f"checkpoint_epoch_{epoch+1}.pth"
            torch.save(model.state_dict(), checkpoint_path)
            mlflow.log_artifact(checkpoint_path, artifact_path="checkpoints")
            os.remove(checkpoint_path) # Clean up the temporary file


        # chcek improvement
        if val_loss < best_loss:
            counter = 0
            best_loss = val_loss
            torch.save(model.state_dict(), "best.pth")
        else:
            counter += 1
        if counter >= patience:
            print("Earlystop!")
            break

    # --- Test Evaluation and Logging (inside the main MLflow run) ---
    print("\n--- Evaluating on Test Set ---")

    # Load the best model state dictionary
    model.load_state_dict(torch.load("best.pth"))
    model.to(device) # Ensure the model is on the correct device

    test_accuracy, test_loss, test_f1, test_recall, test_confusion_matrix = test(test_loader, model, criterion)

    print("\n--- Test Set Results ---")
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")
    print(f"Test Recall: {test_recall:.4f}")
    print("\nConfusion Matrix:")
    print(test_confusion_matrix)

    # Log test metrics to the same MLflow run
    mlflow.log_metric("test_loss", test_loss)
    mlflow.log_metric("test_accuracy", test_accuracy)
    mlflow.log_metric("test_f1_score", test_f1)
    mlflow.log_metric("test_recall", test_recall)
    # Log confusion matrix as an artifact (requires saving it to a file)
    import matplotlib.pyplot as plt
    import seaborn as sns

    plt.figure(figsize=(10, 8))
    sns.heatmap(test_confusion_matrix, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    confusion_matrix_path = "confusion_matrix.png"
    plt.savefig(confusion_matrix_path)
    mlflow.log_artifact(confusion_matrix_path)
    os.remove(confusion_matrix_path) # Clean up the temporary file
    plt.close() # Close the plot

    # The MLflow run automatically ends when exiting this 'with' block

In [None]:
plt.figure(figsize=(15,5))
plt.subplot(1,2,1)
plt.plot(logs['train_loss'],label='Train_Loss')
plt.plot(logs['val_loss'],label='Validation_Loss')
plt.title('Train_Loss & Validation_Loss',fontsize=20)
plt.legend()
plt.subplot(1,2,2)
plt.plot(logs['train_acc'],label='Train_Accuracy')
plt.plot(logs['val_acc'],label='Validation_Accuracy')
plt.title('Train_Accuracy & Validation_Accuracy',fontsize=20)
plt.legend()

In [None]:
## Evaluate on the Test Set and Output Metrics

# Load the best model state dictionary
model.load_state_dict(torch.load("best.pth"))
model.to(device) # Ensure the model is on the correct device

test_accuracy, test_loss, test_f1, test_recall, test_confusion_matrix = test(test_loader, model, criterion)

print("\n--- Test Set Results ---")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")
print(f"Test Recall: {test_recall:.4f}")
print("\nConfusion Matrix:")
print(test_confusion_matrix)

# Optionally, log test metrics to MLflow
with mlflow.start_run():
    mlflow.log_metric("test_loss", test_loss)
    mlflow.log_metric("test_accuracy", test_accuracy)
    mlflow.log_metric("test_f1_score", test_f1)
    mlflow.log_metric("test_recall", test_recall)
    # You might want to log the confusion matrix as an artifact
    # For simplicity, we'll just print it here. Logging it requires
    # saving it to a file or image.