In [None]:
%pip install torch torchvision matplotlib seaborn scikit-learn tqdm

In [None]:
import os
import random
import numpy as np
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split, Subset
from PIL import Image, ImageEnhance
import torch
from ultralytics import YOLO
from sklearn.metrics import (
    confusion_matrix,
    classification_report,
    precision_score,
    recall_score,
    f1_score,
    cohen_kappa_score,
)
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

In [None]:
dataset_directory = "BIGDATIOTPROJ/PlantVillage"
output_directory = "BIGDATIOTPROJ/PlantVillage/Output"

### Custom loader to handle .JPG files

In [None]:
def is_valid_file(filename: str):
    valid_extensions = {'.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp'}
    return filename.lower().endswith(tuple(valid_extensions))


### Function to apply image augmentation and enhancement

In [None]:
def get_transforms(augment=False):
    transform_list = [
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet-like normalization
    ]
    
    if augment:
        transform_list = [
            transforms.RandomChoice([
                transforms.RandomHorizontalFlip(p=1),
                transforms.RandomVerticalFlip(p=1),
                transforms.RandomApply([transforms.RandomRotation(15)], p=0.5),
                transforms.Compose([transforms.RandomHorizontalFlip(p=1), transforms.RandomVerticalFlip(p=1)])
            ]),
            *transform_list
        ]
    
    return transforms.Compose(transform_list)

### Function to balance the training set

In [None]:
def augment_training_data(subset, target_size):
    base_dataset = subset.dataset
    class_indices = {label: [] for label in range(len(base_dataset.classes))}
    for idx, subset_idx in enumerate(subset.indices):
        _, label = base_dataset[subset_idx]
        class_indices[label].append(subset_idx)
    
    augmented_indices = []
    for label, indices in class_indices.items():
        if len(indices) < target_size:
            extra_count = target_size - len(indices)
            augmented_indices.extend(indices)
            augmented_indices.extend(random.choices(indices, k=extra_count))
        else:
            augmented_indices.extend(indices[:target_size])
    
    return Subset(base_dataset, augmented_indices)

### # Function to create directories and move images to the corresponding folder

In [None]:
def create_and_move_images(subset, base_dir, subset_name):
    # Get the underlying dataset to access the classes
    base_dataset = subset.dataset
    
    # Create directories for train, val, test with class subfolders
    os.makedirs(os.path.join(base_dir, subset_name), exist_ok=True)
    for class_name in base_dataset.classes:
        os.makedirs(os.path.join(base_dir, subset_name, class_name), exist_ok=True)
    
    to_pil_image = transforms.ToPILImage()  # Convert tensor to PIL Image
    
    for idx in range(len(subset)):
        img, label = subset[idx]
        img_name = os.path.basename(subset.dataset.imgs[subset.indices[idx]][0])  # Get the image filename
        class_folder = base_dataset.classes[label]
        
        # Convert the tensor to PIL Image and save it
        pil_img = to_pil_image(img)
        pil_img.save(os.path.join(base_dir, subset_name, class_folder, img_name))

### Function to split dataset and create folders

In [None]:
def split_dataset_and_create_folders(dataset_dir, train_size=0.7, val_size=0.15, test_size=0.15, balance_target_size=1500):
    dataset = datasets.ImageFolder(root=dataset_dir, transform=get_transforms(augment=True), is_valid_file=is_valid_file)
    total_size = len(dataset)
    
    train_len = int(train_size * total_size)
    val_len = int(val_size * total_size)
    test_len = total_size - train_len - val_len
    
    train_dataset, val_dataset, test_dataset = random_split(dataset, [train_len, val_len, test_len])
    
    # Augment and balance training data
    balanced_train_dataset = augment_training_data(train_dataset, balance_target_size)
    
    # Apply preprocessing to validation and test sets
    val_dataset.dataset.transform = get_transforms()
    test_dataset.dataset.transform = get_transforms()

    # Create directories and move images into appropriate folders
    create_and_move_images(balanced_train_dataset, dataset_dir, 'train')
    create_and_move_images(val_dataset, dataset_dir, 'val')
    create_and_move_images(test_dataset, dataset_dir, 'test')

    print(f"Data has been split into {os.path.join(dataset_dir, 'train')}, {os.path.join(dataset_dir, 'val')}, {os.path.join(dataset_dir, 'test')}")

### Dataset Preprocessing and Augmentation

In [None]:
dataset_directory = "C:/Users/Pratyush/Desktop/BIGDATIOTPROJ/PlantVillage"
    
split_dataset_and_create_folders(dataset_directory)

### YOLO11 Training

In [None]:
model = YOLO('yolo11n-cls.pt')

results = model.train(data = dataset_directory, epochs = 10, imgsz = 256, device = 'cpu')

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### Function to split the dataset into train, val, and test if directories don't exist

In [None]:
def ensure_split_dataset(dataset_dir, split_ratios=(0.7, 0.15, 0.15)):
    train_dir = os.path.join(dataset_dir, 'Train')
    val_dir = os.path.join(dataset_dir, 'Val')
    test_dir = os.path.join(dataset_dir, 'Test')

    if not all([os.path.exists(train_dir), os.path.exists(val_dir), os.path.exists(test_dir)]):
        print("Splitting dataset into Train, Val, and Test...")

        # Load the full dataset
        transform = transforms.Compose([
            transforms.Resize((227, 227)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        dataset = datasets.ImageFolder(root=dataset_dir, transform=transform)
        total_len = len(dataset)

        # Calculate sizes for train, val, and test splits
        train_size = int(split_ratios[0] * total_len)
        val_size = int(split_ratios[1] * total_len)
        test_size = total_len - train_size - val_size

        # Split the dataset
        train_dataset, val_dataset, test_dataset = random_split(
            dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(42)
        )

        # Create directories and move images into Train, Val, and Test folders
        os.makedirs(train_dir, exist_ok=True)
        os.makedirs(val_dir, exist_ok=True)
        os.makedirs(test_dir, exist_ok=True)

        for subset, subset_dir in [(train_dataset, train_dir), (val_dataset, val_dir), (test_dataset, test_dir)]:
            for img_path, label in [dataset.dataset.samples[idx] for idx in subset.indices]:
                class_folder = dataset.classes[label]
                target_folder = os.path.join(subset_dir, class_folder)
                os.makedirs(target_folder, exist_ok=True)
                img_name = os.path.basename(img_path)
                os.rename(img_path, os.path.join(target_folder, img_name))
        
        print("Dataset split completed.")

#### Define transformations for training and testing

In [None]:
def get_transforms(augment=False):
    transform_list = [
        transforms.Resize((224, 224)),  # AlexNet requires 224x224 images
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # ImageNet-like normalization
    ]
    
    if augment:
        transform_list = [
            transforms.RandomChoice([
                transforms.RandomHorizontalFlip(p=1),
                transforms.RandomVerticalFlip(p=1),
                transforms.RandomApply([transforms.RandomRotation(15)], p=0.5),
                transforms.Compose([transforms.RandomHorizontalFlip(p=1), transforms.RandomVerticalFlip(p=1)])
            ]),
            *transform_list
        ]
    
    return transforms.Compose(transform_list)

#### Load the dataset

In [None]:
def load_data(dataset_dir, batch_size=32):
    train_dataset = datasets.ImageFolder(root=f'{dataset_dir}/train', transform=get_transforms(augment=True))
    val_dataset = datasets.ImageFolder(root=f'{dataset_dir}/val', transform=get_transforms())
    test_dataset = datasets.ImageFolder(root=f'{dataset_dir}/test', transform=get_transforms())

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, train_dataset.classes

In [None]:
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"