In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import pandas as pd


In [None]:
print(tf.config.list_physical_devices('GPU'))

In [None]:
!nvidia-smi


In [None]:
# TO REMOVE CORRUPTED IMAGES

import os
from PIL import Image

RAW_DATASET_DIR = 'raw_dataset'  # Your original dataset folder

def remove_corrupted_images(dataset_dir):
    removed_files = []
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        for img_file in os.listdir(class_path):
            img_path = os.path.join(class_path, img_file)
            try:
                with Image.open(img_path) as img:
                    img.verify()  # Verify if image is corrupted
            except (IOError, SyntaxError) as e:
                print(f"Removing corrupted image: {img_path}")
                os.remove(img_path)
                removed_files.append(img_path)
    return removed_files

removed = remove_corrupted_images(RAW_DATASET_DIR)
print(f"Total corrupted images removed: {len(removed)}")


In [None]:
# TO CLEAN LABELS
import re

def check_class_names(dataset_dir):
    invalid_names = []
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        # Allow only alphanumeric, underscore, and spaces
        if not re.match(r'^[\w\s]+$', class_name):
            invalid_names.append(class_name)
    return invalid_names

invalid_classes = check_class_names(RAW_DATASET_DIR)
if invalid_classes:
    print("Invalid class folder names detected:")
    for name in invalid_classes:
        print(f" - {name}")
else:
    print("All class folder names are clean!")


In [None]:
# Resizing Images and Converting to JPEG

from PIL import Image
import os

def resize_and_convert(dataset_dir, output_dir, target_size):
    total_images = 0
    total_skipped = 0
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        processed_class_path = os.path.join(output_dir, class_name.replace(' ', '_'))
        os.makedirs(processed_class_path, exist_ok=True)

        count = 0
        for img_name in os.listdir(class_path):
            src_path = os.path.join(class_path, img_name)
            try:
                with Image.open(src_path) as img:
                    img = img.convert('RGB')
                    img = img.resize(target_size, Image.Resampling.LANCZOS)
                    base_name = os.path.splitext(img_name)[0]
                    save_path = os.path.join(processed_class_path, base_name + '.jpg')
                    img.save(save_path, 'JPEG', quality=95)
                count += 1
                total_images += 1
            except Exception as e:
                print(f"Skipping invalid image {src_path}: {e}")
                total_skipped += 1
        print(f"Processed {count} images in class '{class_name}'")

    print(f"Total images processed: {total_images}")
    print(f"Total images skipped: {total_skipped}")

# Example usage:
resize_and_convert('raw_dataset', 'processed_dataset', (224, 224))


In [None]:
import os

def count_images(dataset_dir):
    total_images = 0
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        num_images = len([f for f in os.listdir(class_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        print(f"Class '{class_name}': {num_images} images")
        total_images += num_images
    print(f"Total images in dataset: {total_images}")
    return total_images

count_images('processed_dataset')


In [None]:
import os

def count_images(dataset_dir):
    total_images = 0
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        num_images = len([f for f in os.listdir(class_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])
        print(f"Class '{class_name}': {num_images} images")
        total_images += num_images
    print(f"Total images in dataset: {total_images}")
    return total_images

count_images('raw_dataset')

In [None]:
import os
from PIL import Image

PROCESSED_DATASET_DIR = 'processed_dataset'  # update path if different

def dhash(image, hash_size=8):
    # Use LANCZOS resampling instead of deprecated ANTIALIAS
    image = image.convert('L').resize((hash_size + 1, hash_size), Image.Resampling.LANCZOS)
    pixels = list(image.getdata())
    difference = []
    for row in range(hash_size):
        for col in range(hash_size):
            left_pixel = pixels[row * (hash_size + 1) + col]
            right_pixel = pixels[row * (hash_size + 1) + col + 1]
            difference.append(left_pixel > right_pixel)
    decimal_value = 0
    hex_string = []
    for index, value in enumerate(difference):
        if value:
            decimal_value += 2 ** (index % 8)
        if (index % 8) == 7:
            hex_string.append(hex(decimal_value)[2:].rjust(2, '0'))
            decimal_value = 0
    return ''.join(hex_string)

def remove_duplicates(dataset_dir):
    hashes = set()
    removed_count = 0
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        for img_name in os.listdir(class_path):
            img_path = os.path.join(class_path, img_name)
            try:
                with Image.open(img_path) as img:
                    h = dhash(img)
                if h in hashes:
                    print(f"Duplicate found, removing: {img_path}")
                    os.remove(img_path)
                    removed_count += 1
                else:
                    hashes.add(h)
            except Exception as e:
                print(f"Error processing image {img_path}: {e}")
    return removed_count

duplicates_removed = remove_duplicates(PROCESSED_DATASET_DIR)
print(f"Duplicates removed: {duplicates_removed}")


In [None]:
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array, load_img

# Set the processed dataset directory
DATASET_DIR = 'processed_dataset'

# ImageDataGenerator with augmentation settings
augmentor = ImageDataGenerator(
    rotation_range=25,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode="nearest"
)

# Initialize total counters
total_original_images = 0
total_augmented_images = 0

# Process each class
for class_name in os.listdir(DATASET_DIR):
    class_path = os.path.join(DATASET_DIR, class_name)
    if not os.path.isdir(class_path):
        continue

    print(f"\n📁 Processing class: {class_name}")
    image_count = 0
    augmented_count = 0

    for img_name in os.listdir(class_path):
        if img_name.startswith('aug_'):
            continue  # Skip already augmented images

        try:
            img_path = os.path.join(class_path, img_name)
            img = load_img(img_path)
            x = img_to_array(img)
            x = x.reshape((1,) + x.shape)

            # Generate 3 augmented images
            i = 0
            for batch in augmentor.flow(
                x,
                batch_size=1,
                save_to_dir=class_path,
                save_prefix='aug',
                save_format='jpeg'
            ):
                i += 1
                augmented_count += 1
                total_augmented_images += 1
                if i >= 3:
                    break

            image_count += 1
            total_original_images += 1
            print(f"✅ Augmented 3 images for: {img_name}")

        except Exception as e:
            print(f"❌ Error augmenting image {img_name}: {e}")

    print(f"📊 Class summary — Original: {image_count}, Augmented: {augmented_count}")

print("\n✅ Augmentation complete")
print(f"🔢 Total original images processed: {total_original_images}")
print(f"📈 Total augmented images created: {total_augmented_images}")


In [None]:
import os
from PIL import Image
import random
from torchvision import transforms

# Define your augmentation pipeline
augmentation = transforms.Compose([
    transforms.RandomRotation(degrees=20),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1)
])

def find_minority_classes(dataset_dir, threshold=3500):
    minority_classes = []
    print("🔍 Checking classes for minority status based on threshold ≤", threshold)
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        num_images = len([name for name in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, name))])
        print(f"Class '{class_name}' has {num_images} images")
        if num_images <= threshold:
            minority_classes.append(class_name)
            print(f"✅ Class '{class_name}' marked as minority class")
    return minority_classes

def augment_minority_classes(dataset_dir, minority_classes, augmentations_per_image=3):
    total_original = 0
    total_augmented = 0

    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue

        if class_name not in minority_classes:
            print(f"⏭ Skipping majority class: {class_name}")
            continue
        
        print(f"\n🚀 Augmenting minority class: {class_name}")

        for img_name in os.listdir(class_path):
            if img_name.startswith('aug_') or '_aug' in img_name:
                # Skip already augmented images to avoid duplication
                continue
            
            img_path = os.path.join(class_path, img_name)
            try:
                with Image.open(img_path) as img:
                    total_original += 1
                    base_name, ext = os.path.splitext(img_name)
                    
                    for i in range(augmentations_per_image):
                        augmented_img = augmentation(img)
                        augmented_name = f"{base_name}_aug{i+1}.jpg"
                        augmented_path = os.path.join(class_path, augmented_name)
                        augmented_img.save(augmented_path, 'JPEG', quality=95)
                        total_augmented += 1

                    print(f"✅ Augmented {augmentations_per_image} images for: {img_name}")

            except Exception as e:
                print(f"❌ Error augmenting {img_path}: {e}")

    print("\n✅ Augmentation complete")
    print(f"🔢 Total original images processed for minority classes: {total_original}")
    print(f"📈 Total augmented images created for minority classes: {total_augmented}")

# Set your processed dataset directory path here
PROCESSED_DATASET_DIR = 'processed_dataset'

# Step 1: Find minority classes automatically
minority_classes = find_minority_classes(PROCESSED_DATASET_DIR, threshold=3500)
print("\n🔎 Minority classes detected:", minority_classes)

# Step 2: Augment only minority classes
augment_minority_classes(PROCESSED_DATASET_DIR, minority_classes)


In [None]:
import os
from PIL import Image
import random
from torchvision import transforms

# Define your augmentation pipeline
augmentation = transforms.Compose([
    transforms.RandomRotation(degrees=20),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1)
])

def find_minority_classes(dataset_dir, threshold=2000):
    minority_classes = []
    print("🔍 Checking classes for minority status based on threshold ≤", threshold)
    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        num_images = len([name for name in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, name))])
        print(f"Class '{class_name}' has {num_images} images")
        if num_images <= threshold:
            minority_classes.append(class_name)
            print(f"✅ Class '{class_name}' marked as minority class")
    return minority_classes

def augment_minority_classes(dataset_dir, minority_classes, augmentations_per_image=3):
    total_original = 0
    total_augmented = 0

    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue

        if class_name not in minority_classes:
            print(f"⏭ Skipping majority class: {class_name}")
            continue
        
        print(f"\n🚀 Augmenting minority class: {class_name}")

        for img_name in os.listdir(class_path):
            img_path = os.path.join(class_path, img_name)
            try:
                with Image.open(img_path) as img:
                    total_original += 1
                    base_name, ext = os.path.splitext(img_name)
                    
                    for i in range(augmentations_per_image):
                        augmented_img = augmentation(img)
                        augmented_name = f"{base_name}_aug{i+1}.jpg"
                        augmented_path = os.path.join(class_path, augmented_name)
                        augmented_img.save(augmented_path, 'JPEG', quality=95)
                        total_augmented += 1

                    print(f"✅ Augmented {augmentations_per_image} images for: {img_name}")

            except Exception as e:
                print(f"❌ Error augmenting {img_path}: {e}")

    print("\n✅ Augmentation complete")
    print(f"🔢 Total images processed for minority classes: {total_original}")
    print(f"📈 Total augmented images created for minority classes: {total_augmented}")

# Set your processed dataset directory path here
PROCESSED_DATASET_DIR = 'processed_dataset'

# Step 1: Find minority classes automatically
minority_classes = find_minority_classes(PROCESSED_DATASET_DIR, threshold=2000)
print("\n🔎 Minority classes detected:", minority_classes)

# Step 2: Augment only minority classes
augment_minority_classes(PROCESSED_DATASET_DIR, minority_classes)


In [None]:
import os
import json

# ======================
# 🔧 CONFIGURATION
# ======================
DATASET_DIR = 'processed_dataset'  # Folder where class subfolders are
LABEL_MAPPING_FILE = 'label_mapping.json'

def encode_labels(dataset_dir, save_mapping_file):
    print("🔍 Scanning dataset for class labels...")

    # Get list of folder names (each folder = one class label)
    class_labels = [d for d in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir, d))]
    print(f"🗂 Found {len(class_labels)} classes:")
    for label in class_labels:
        print(f"  - {label}")

    # Sort labels and assign integers
    label_to_int = {label: idx for idx, label in enumerate(sorted(class_labels))}
    print("\n🔢 Label to integer mapping:")
    for label, idx in label_to_int.items():
        print(f"  '{label}' -> {idx}")

    # Save mapping to JSON for future reference
    with open(save_mapping_file, 'w') as f:
        json.dump(label_to_int, f, indent=4)
    print(f"\n💾 Saved label mapping to '{save_mapping_file}'")

    print("\n✅ Label encoding done!")
    return label_to_int

# Run encoding
label_mapping = encode_labels(DATASET_DIR, LABEL_MAPPING_FILE)


In [None]:
import os
import shutil
import random

# ======================
# 🔧 CONFIGURATION
# ======================
DATASET_DIR = 'processed_dataset'  # Original dataset folder with class subfolders
OUTPUT_DIR = 'dataset_split'       # Folder to create train/, val/, and test/ folders inside
TRAIN_RATIO = 0.7
VAL_RATIO = 0.1
TEST_RATIO = 0.2
SEED = 42  # For reproducibility

random.seed(SEED)

def split_dataset(dataset_dir, output_dir, train_ratio=0.7, val_ratio=0.1, test_ratio=0.2):
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "Ratios must sum to 1"

    train_dir = os.path.join(output_dir, 'train')
    val_dir = os.path.join(output_dir, 'val')
    test_dir = os.path.join(output_dir, 'test')

    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(val_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    for class_name in os.listdir(dataset_dir):
        class_path = os.path.join(dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue

        train_class_dir = os.path.join(train_dir, class_name)
        val_class_dir = os.path.join(val_dir, class_name)
        test_class_dir = os.path.join(test_dir, class_name)

        os.makedirs(train_class_dir, exist_ok=True)
        os.makedirs(val_class_dir, exist_ok=True)
        os.makedirs(test_class_dir, exist_ok=True)

        images = [f for f in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, f))]
        random.shuffle(images)

        n_total = len(images)
        n_train = int(n_total * train_ratio)
        n_val = int(n_total * val_ratio)

        train_images = images[:n_train]
        val_images = images[n_train:n_train + n_val]
        test_images = images[n_train + n_val:]

        for img_name in train_images:
            shutil.copy2(os.path.join(class_path, img_name), os.path.join(train_class_dir, img_name))

        for img_name in val_images:
            shutil.copy2(os.path.join(class_path, img_name), os.path.join(val_class_dir, img_name))

        for img_name in test_images:
            shutil.copy2(os.path.join(class_path, img_name), os.path.join(test_class_dir, img_name))

    print(f"✅ Dataset split completed with {train_ratio*100}% train, {val_ratio*100}% val, and {test_ratio*100}% test.")

# Run the split
split_dataset(DATASET_DIR, OUTPUT_DIR, TRAIN_RATIO, VAL_RATIO, TEST_RATIO)


In [1]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # if needed again
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # RGB
])

train_dataset = datasets.ImageFolder('dataset_split/train', transform=transform)
val_dataset = datasets.ImageFolder('dataset_split/val', transform=transform)
test_dataset = datasets.ImageFolder('dataset_split/test', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PlantDiseaseCNN(nn.Module):
    def __init__(self, num_classes):
        super(PlantDiseaseCNN, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, padding=1)

        self.dropout = nn.Dropout(0.35)

        self.fc1 = nn.Linear(256 * 14 * 14, 512)  # After 2 poolings, 224→112→56→28→14
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))  # [B, 32, 112, 112]
        x = self.pool(F.relu(self.conv2(x)))  # [B, 64, 56, 56]
        x = self.pool(F.relu(self.conv3(x)))  # [B, 128, 28, 28]
        x = self.pool(F.relu(self.conv4(x)))  # [B, 256, 14, 14]

        x = x.view(-1, 256 * 14 * 14)         # Flatten
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x
