<a href="https://www.kaggle.com/code/themeeemul/fungi-classification-using-resnet50-with-pytorch?scriptVersionId=143884441" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
import numpy as np
import cv2
from PIL import Image
from sklearn.metrics import classification_report,confusion_matrix,accuracy_score
from collections import defaultdict
from matplotlib import pyplot as plt
from sklearn.model_selection import KFold

In [None]:
# # Load the extension and start TensorBoard

# %load_ext tensorboard
# %tensorboard --logdir logs

In [None]:
import torch
from torchvision import transforms, models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader,Dataset,ConcatDataset, SubsetRandomSampler
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [None]:

# Set a random seed
seed = 42
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False  # Disable CuDNN benchmarking for reproducibility


In [None]:
train_path = "/kaggle/input/microscopic-fungi-images/train"
valid_path = "/kaggle/input/microscopic-fungi-images/valid"
test_path = "/kaggle/input/microscopic-fungi-images/test"

# Check Distribution of each class in dataset

In [None]:
train_datasets = ImageFolder(root = train_path)
valid_datasets = ImageFolder(root = valid_path)
test_datasets = ImageFolder(root= test_path)
label_dict = defaultdict(int)

for _,label in train_datasets:
    label_dict[label] +=1
class_list = []
class_count = []
for class_idx, count in label_dict.items():
    class_list.append(class_idx)
    class_count.append(class_count)
    print("Class idx: ",class_idx, "|| count: ",count)
    


In [None]:
#Finding mean and std for normalization later on
# or you can use mean and std that provided from imagenet mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
mean = 0.0
std = 0.0
num_samples = 0
image_sizes = defaultdict(int)
for data, _ in train_datasets:
    height,width = data.size
    image_sizes[(height,width)]+=1
    image_array = np.array(data)
      
    # Update mean and std
    mean += np.mean(image_array,axis=(0,1))
    std += np.std(image_array,axis = (0,1))
    num_samples += 1

mean /= num_samples
std /= num_samples

In [None]:
print(mean,std)

In [None]:
for size, count in image_sizes.items():
    print(f"Image size {size}: {count} images")

# Preparing Data

In [None]:
# Gaussian Blur
class Gaussian_Blur(object):
    def __init__(self,radius = 2):
        if radius % 2 == 0:
            radius += 1  # Make sure radius is odd
        self.radius = radius
    def __call__(self,img):
        return transforms.GaussianBlur(self.radius)(img)
class HistogramEqualization(object):
    def __call__(self,img):
        return transforms.functional.equalize(img)

In [None]:
data_transforms ={
    "train":
        transforms.Compose([
            transforms.Resize([224,224]),
            HistogramEqualization(),
            Gaussian_Blur(radius = 2),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    ,
    "test-valid":
        transforms.Compose([
            transforms.Resize([224,224]),
            HistogramEqualization(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
} 

In [None]:
class Fungi(Dataset):
    def __init__(self,dataset,transforms = None):
        self.dataset = dataset
        self.transforms = transforms
    def __len__(self):
        return len(self.dataset)
    def __getitem__(self,idx):
        image,label = self.dataset[idx]
        
        if self.transforms:
            image = self.transforms(image) 
        sample = {"image":image,"label":label}   
        return sample

In [None]:
train_dataset = Fungi(train_datasets,data_transforms['train'])
valid_dataset = Fungi(valid_datasets,data_transforms['test-valid'])
test_dataset = Fungi(valid_datasets,data_transforms['test-valid'])

In [None]:
# def show_image(image_tensor):
#     # Convert the image tensor to a NumPy array
#     image_array = image_tensor.numpy()

#     # If the image has 3 channels (e.g., RGB), transpose it to (H, W, C) for display
#     if image_array.shape[0] == 3:
#         image_array = image_array.transpose(1, 2, 0)

#     # Display the image using Matplotlib
#     plt.imshow(image_array)
#     plt.axis('off')
#     plt.show()

# # Load a batch of data (change 'next(iter(data_loader))' to access different batches)
# batch = next(iter(train_loader))

# # Extract an image from the batch (change '0' to access different images within the batch)
# image = batch['image'][0]

# # Display the image
# show_image(image)

# Define model

In [None]:
def initialize_network():
    model = models.resnet50(weights="IMAGENET1K_V2").to(device)
#     # Freezing
#     for param in model.parameters():
#         param.requires_grad = False   

    num_ftrs = model.fc.in_features

    model.fc = nn.Sequential(
                   nn.BatchNorm1d(num_ftrs),
                   nn.Dropout(0.5),
                   nn.Linear(2048, 1024),
                   nn.ReLU(inplace=True),
                   nn.Linear(1024, 512),
                   nn.ReLU(inplace=True),
                   nn.BatchNorm1d(512),
                   nn.Dropout(0.5),
                   nn.Linear(512, 5)).to(device)
    return model

In [None]:
def feedNN(model,loader,type):
    total_loss = 0
    num_samples = 0
    prediction_list = []
    label_list = []
    for i,data in enumerate(loader):
        images = data['image'].to(device)
        labels = data['label'].to(device)
        
        
        
        #Feed it to model
        outputs = model(images)
        #Loss function
        loss = criterion(outputs,labels)
        if(type == "train"):
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        
        total_loss +=loss.item()
        # number of samples
        num_samples += images.size()[0]
        #Prediction
        prediction_list.append(torch.argmax(outputs,dim = 1).cpu().detach().numpy())
        label_list.append(labels.cpu().detach().numpy())
    
    average_loss = total_loss/num_samples
    prediction_list = np.concatenate(prediction_list).ravel()
    label_list = np.concatenate(label_list).ravel()
    return average_loss , prediction_list , label_list
        

    

In [None]:
def plotting(train_acc,valid_acc,train_loss,valid_loss):
    plt.subplot(1,2,1)
    plt.title("Accuracy")
    epochs = range(1,11)
    plt.plot(epochs,train_acc,'g', label='Training Accuracy')
    plt.plot(epochs,valid_acc,'r', label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()
    
    plt.subplot(1,2,2)
    plt.title("Loss")
    epochs = range(1,11)
    plt.plot(epochs,train_loss,'g', label='Training Loss')
    plt.plot(epochs,valid_loss,'r', label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
hyperparameters = {
    "batch_size":2
}

In [None]:
concat_dataset = ConcatDataset([train_dataset,valid_dataset])
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

train_loader = DataLoader(concat_dataset,batch_size = 16,shuffle=True)
valid_loader = DataLoader(concat_dataset,batch_size = 16, shuffle = False)
test_loader = DataLoader(concat_dataset,batch_size = 16, shuffle = False)

# Initialize of the network 
model = initialize_network()
# Initialize of the Optimizer and Loss Function
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),lr=0.001)



In [None]:
# from torch.utils.tensorboard import SummaryWriter
# log_dir = "./logs"  # Specify a directory for TensorBoard logs
# writer = SummaryWriter(log_dir)


In [None]:
epoch = 10

train_list = {"acc":[],"loss":[]}
valid_list = {"acc":[],"loss":[]}
for i in range(0,epoch):
    print("==================")
    print("Epoch : ",i+1)

    #Train our model
    model.train()
    train_loss, train_preds, train_labels = feedNN(model,train_loader,"train")
    model.eval()
    with torch.no_grad():
        valid_loss, valid_preds, valid_labels = feedNN(model,valid_loader,"valid")
    #Metrics
    print("Train_Accuracy: ",accuracy_score(train_labels,train_preds), "| Valid_Accuracy: ",accuracy_score(valid_labels,valid_preds))
    print("Train_loss: ",train_loss,"| Valid_loss : ",valid_loss)
#     #store acc and loss for plotting later on
    
#     writer.add_scalar("Training Loss", train_loss, global_step=i)
#     writer.add_scalar("Training Accuracy", train_preds, global_step=i)
    
    train_list['acc'].append(accuracy_score(train_labels,train_preds))
    valid_list['acc'].append(accuracy_score(valid_labels,valid_preds))
    train_list['loss'].append(train_loss)
    valid_list['loss'].append(valid_loss)

In [None]:
plotting(train_list['acc'],valid_list['acc'],train_list['loss'],valid_list['loss'])
torch.save(model.state_dict(),"resnet50_1.pt")

# Testing

In [None]:
model.eval()
with torch.no_grad():
    test_loss, test_preds, test_labels = feedNN(model,test_loader,"test")
# classification_report,confusion_matrix
print("Testing result")
print(classification_report(test_labels,test_preds))
print("=======================================")
print(confusion_matrix(test_labels,test_preds))

In [None]:
# from IPython.display import FileLink

# # Create a download link for the model file
# FileLink('resnet50_1.pt')

In [None]:
# model.eval()  # Set the model to evaluation mode

# predicted_list = []
# label_list = []
# with torch.no_grad():
#     for inputs in test_loader:  # Iterate through the test dataset
#         images = inputs['image']
#         labels = inputs['label']
#         outputs = model(images)  # Forward pass
#         predicted = torch.argmax(outputs, 1)  # Get predicted labels
#         predicted_list.append(predicted.cpu().detach().numpy())
#         label_list.append(labels.cpu().detach().numpy())
        
# predicted_list = np.concatenate(predicted_list).ravel()
# label_list  = np.concatenate(label_list).ravel()


In [None]:
# # classification_report,confusion_matrix
# print("Testing result")
# print(classification_report(label_list,predicted_list))
# print("=======================================")
# print(confusion_matrix(label_list,predicted_list))

In [None]:
# k = 2
# # Define device
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# # Define the K-fold Cross Validator
# kfold = KFold(n_splits=k, shuffle=True)
    
# # Start print
# print('--------------------------------')

# # K-fold Cross Validation model evaluation
# for fold, (train_ids, valid_ids) in enumerate(kfold.split(concat_dataset)):
#     print("Fold: ",fold+1)
#     train_subsampler = SubsetRandomSampler(train_ids)
#     valid_subsampler = SubsetRandomSampler(valid_ids)
    
#     #Data Loader
#     train_loader = DataLoader(concat_dataset,batch_size = 16,sampler = train_subsampler)
#     valid_loader = DataLoader(concat_dataset,batch_size = 16,sampler = valid_subsampler)
    
#     # Initialize of the network 
#     model = initialize_network()
#     # Initialize of the Optimizer and Loss Function
#     criterion = nn.CrossEntropyLoss()
#     optimizer = optim.Adam(model.fc.parameters(),lr=0.001)
#     # Train
#     epoch = 10
    
#     train_list = {"acc":[],"loss":[]}
#     valid_list = {"acc":[],"loss":[]}
#     for i in range(0,epoch):
#         print("==================")
#         print("Epoch : ",i+1)
        
#         #Train our model
#         model.train()
#         train_loss, train_preds, train_labels = feedNN(model,train_loader)
#         model.eval()
#         valid_loss, valid_preds, valid_labels = feedNN(model,valid_loader)
#         #Metrics
#         print("Train_Accuracy: ",accuracy_score(train_labels,train_preds), "| Valid_Accuracy: ",accuracy_score(valid_labels,valid_preds))
#         print("Train_loss: ",train_loss,"| Valid_loss : ",valid_loss)
#         #store acc and loss for plotting later on
#         train_list['acc'].append(accuracy_score(train_labels,train_preds))
#         valid_list['acc'].append(accuracy_score(valid_labels,valid_preds))
#         train_list['loss'].append(train_loss)
#         valid_list['loss'].append(valid_loss)
        
#     print("Plot Fold: ",fold)
#     plotting(train_list['acc'],valid_list['acc'],train_list['loss'],valid_list['loss'])
# torch.save(model.state_dict(),"resnet50_1.pt")