In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, utils, datasets
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from sklearn.metrics import classification_report, confusion_matrix
torch.cuda.empty_cache()
from PIL import Image
import warnings
import cv2

In [None]:
import os
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
Image.LOAD_TRUNCATED_IMAGES = True

def list_images_train(base_dir):
    images_with_labels = []
    for class_dir in os.listdir(base_dir):
        for type_dir in ['train_set']:  
            for subclass_dir in os.listdir(os.path.join(base_dir, class_dir, type_dir)):
                images_dir = os.path.join(base_dir, class_dir, type_dir, subclass_dir)
                for image in os.listdir(images_dir):
                    if image.lower().endswith(('.png', '.jpg', '.jpeg')): 
                        image_path = os.path.join(images_dir, image)
                        label = f"{class_dir}_{subclass_dir}"
                        images_with_labels.append((image_path, label))
    return images_with_labels

import os

def list_images_test(base_dir):
    images_with_labels = []
    for class_dir in os.listdir(base_dir):
        for type_dir in ['test_set']:  # Only looking into 'test' directory
            for subclass_dir in os.listdir(os.path.join(base_dir, class_dir, type_dir)):
                images_dir = os.path.join(base_dir, class_dir, type_dir, subclass_dir)
                for image in os.listdir(images_dir):
                    if image.lower().endswith(('.png', '.jpg', '.jpeg')):  # Filter for image files
                        image_path = os.path.join(images_dir, image)
                        label = f"{class_dir}_{subclass_dir}"
                        images_with_labels.append((image_path, label))
    return images_with_labels

print(len(list_images_train("/kaggle/input/teamken-data/Dataset for Crop Pest and Disease Detection/CCMT Dataset-Augmented")))
print(len(list_images_test("/kaggle/input/teamken-data/Dataset for Crop Pest and Disease Detection/CCMT Dataset-Augmented")))

In [None]:
from PIL import UnidentifiedImageError, Image
from torch.utils.data import random_split
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image, UnidentifiedImageError
import os
import re


class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.transform = transform
        self.image_paths, self.labels = self._validate_images(image_paths, labels)
        unique_labels = sorted(set(self.labels))
        self.label_to_index = {label: idx for idx, label in enumerate(unique_labels)}

    def _validate_images(self, image_paths, labels):
        valid_image_paths = []
        valid_labels = []
        for path, label in zip(image_paths, labels):
            try:
                with Image.open(path) as img:
                    img.load()
                valid_image_paths.append(path)
                modified_label = re.sub(r'\d+$', '', label).rstrip("_") 
                valid_labels.append(modified_label)
            except (UnidentifiedImageError, OSError) as e:
                print(f"Skipping problematic image: {path} due to error: {e}")
                continue
        return valid_image_paths, valid_labels

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        image_path = self.image_paths[index]
        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = self.labels[index]
        # Ensure label is processed for lookup in label_to_index
        processed_label = re.sub(r'\d+$', '', label).rstrip("_") # Process label again during lookup
        label_index = self.label_to_index[processed_label]
        return image, label_index

transform = transforms.Compose([
    transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
    transforms.CenterCrop(size=224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

base_dir = '/kaggle/input/teamken-data/Dataset for Crop Pest and Disease Detection/CCMT Dataset-Augmented'
train_data = list_images_train(base_dir)
test_data = list_images_test(base_dir)    

train_image_paths, train_labels = zip(*train_data)
test_image_paths, test_labels = zip(*test_data)     

train_image_paths, train_labels = list(train_image_paths), list(train_labels)
test_image_paths, test_labels = list(test_image_paths), list(test_labels)

train_image_paths, val_image_paths, train_labels, val_labels = train_test_split(
    train_image_paths, train_labels, test_size=0.2, random_state=42
)



In [None]:
train_dataset = CustomDataset(image_paths=train_image_paths, labels=train_labels, transform=transform)

In [None]:
val_dataset = CustomDataset(image_paths=val_image_paths, labels=val_labels, transform=transform)

In [None]:
test_dataset = CustomDataset(image_paths=test_image_paths, labels=test_labels, transform=transform)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=1)
valid_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=1)

In [None]:
n_classes = len(train_dataset.label_to_index)
n_classes1 = len(test_dataset.label_to_index)

print(f"Total number of unique train classes: {n_classes}")
print(f"Total number of unique test classes: {n_classes1}")

print(train_dataset.label_to_index)

In [None]:
train_on_gpu = torch.cuda.is_available()
print(f'Train on gpu: {train_on_gpu}')

In [None]:
import torchvision.models as models

model = models.resnet18(pretrained=True)

from torch import optim
n_classes=22
n_inputs = model.fc.in_features
model.fc = nn.Sequential(
                      nn.Linear(n_inputs, 128),
                      nn.ReLU(),
                      nn.Dropout(0.6),
                      nn.Linear(128, n_classes),                   
                      nn.LogSoftmax(dim=1))
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-5)

train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
    model = model.to('cuda')

In [None]:
import time

def train(model,
          criterion,
          optimizer,
          train_loader,
          valid_loader,
          save_file_name,
          max_epochs_stop=3,
          n_epochs=20,
          print_every=1):
    """Train a PyTorch Model
    Params
    --------
        model (PyTorch model): cnn to train
        criterion (PyTorch loss): objective to minimize
        optimizer (PyTorch optimizier): optimizer to compute gradients of model parameters
        train_loader (PyTorch dataloader): training dataloader to iterate through
        valid_loader (PyTorch dataloader): validation dataloader used for early stopping
        save_file_name (str ending in '.pt'): file path to save the model state dict
        max_epochs_stop (int): maximum number of epochs with no improvement in validation loss for early stopping
        n_epochs (int): maximum number of training epochs
        print_every (int): frequency of epochs to print training stats
    Returns
    --------
        model (PyTorch model): trained cnn with best weights
        history (DataFrame): history of train and validation loss and accuracy
    """

    epochs_no_improve = 0
    valid_loss_min = np.Inf

    valid_max_acc = 0
    history = []

    try:
        print(f'Model has been trained for: {model.epochs} epochs.\n')
    except:
        model.epochs = 0
        print(f'Starting Training from Scratch.\n')

    overall_start = time.time()

    for epoch in range(n_epochs):

        train_loss = 0.0
        valid_loss = 0.0

        train_acc = 0
        valid_acc = 0

        model.train()
        start = time.time()

        for ii, (data, target) in enumerate(train_loader):
            if train_on_gpu:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item() * data.size(0)
            _, pred = torch.max(output, dim=1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            accuracy = torch.mean(correct_tensor.type(torch.FloatTensor))
            train_acc += accuracy.item() * data.size(0)
            print(
                f'Epoch: {epoch}\t{100 * (ii + 1) / len(train_loader):.2f}% complete. {time.time() - start:.2f} seconds elapsed in epoch.',
                end='\r')
        else:
            model.epochs += 1
            with torch.no_grad():
                model.eval()
                for data, target in valid_loader:
                    if train_on_gpu:
                        data, target = data.cuda(), target.cuda()
                    output = model(data)
                    loss = criterion(output, target)
                    valid_loss += loss.item() * data.size(0)
                    _, pred = torch.max(output, dim=1)
                    correct_tensor = pred.eq(target.data.view_as(pred))
                    accuracy = torch.mean(
                        correct_tensor.type(torch.FloatTensor))
                    valid_acc += accuracy.item() * data.size(0)
                train_loss = train_loss / len(train_loader.dataset)
                valid_loss = valid_loss / len(valid_loader.dataset)
                train_acc = train_acc / len(train_loader.dataset)
                valid_acc = valid_acc / len(valid_loader.dataset)
                history.append([train_loss, valid_loss, train_acc, valid_acc])
                if (epoch + 1) % print_every == 0:
                    print(
                        f'\nEpoch: {epoch} \tTraining Loss: {train_loss:.4f} \tValidation Loss: {valid_loss:.4f}'
                    )
                    print(
                        f'\t\tTraining Accuracy: {100 * train_acc:.2f}%\t Validation Accuracy: {100 * valid_acc:.2f}%'
                    )
                if valid_loss < valid_loss_min:
                    torch.save(model.state_dict(), save_file_name)
                    epochs_no_improve = 0
                    valid_loss_min = valid_loss
                    valid_best_acc = valid_acc
                    best_epoch = epoch
                else:
                    epochs_no_improve += 1
                    if epochs_no_improve >= max_epochs_stop:
                        print(
                            f'\nEarly Stopping! Total epochs: {epoch}. Best epoch: {best_epoch} with loss: {valid_loss_min:.2f} and acc: {100 * valid_acc:.2f}%'
                        )
                        total_time = time.time() - overall_start
                        print(
                            f'{total_time:.2f} total seconds elapsed. {total_time / (epoch+1):.2f} seconds per epoch.'
                        )
                        model.load_state_dict(torch.load(save_file_name))
                        model.optimizer = optimizer
                        history = pd.DataFrame(
                            history,
                            columns=[
                                'train_loss', 'valid_loss', 'train_acc',
                                'valid_acc'
                            ])
                        return model, history

    model.optimizer = optimizer
    total_time = time.time() - overall_start

    history = pd.DataFrame(
        history,
        columns=['train_loss', 'valid_loss', 'train_acc', 'valid_acc'])
    return model, history

model, history = train(
    model,
    criterion,
    optimizer,
    train_loader,
    valid_loader,
    save_file_name='/kaggle/working/model_checkpoint.pth',
    max_epochs_stop=7,
    n_epochs=100,
    print_every=1)

In [None]:
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score

import torch

def test(model, criterion, test_loader, train_on_gpu=False):
    test_loss = 0.0
    test_acc = 0.0
    y_true = []
    y_pred = []

    model.eval()

    with torch.no_grad():
        for data, target in test_loader:
            if train_on_gpu:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item() * data.size(0)

            _, pred = torch.max(output, 1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            accuracy = torch.mean(correct_tensor.type(torch.FloatTensor))
            test_acc += accuracy.item() * data.size(0)

            y_true.extend(target.cpu().numpy())
            y_pred.extend(pred.cpu().numpy())

    test_loss = test_loss / len(test_loader.dataset)
    test_acc = test_acc / len(test_loader.dataset)
    f1 = f1_score(y_true, y_pred, average='weighted')
    acc = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
    print(f'\nTest Loss: {test_loss:.4f} | Test Accuracy: {test_acc * 100:.2f}%')
    print(f'F1 Score: {f1:.4f} | Accuracy: {acc * 100:.2f}%')
    print(f'Precision: {precision:.4f} | Recall: {recall:.4f}')

test(model, criterion, test_loader, train_on_gpu)
