In [None]:
import torch
from torch import nn
import os
import random
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from PIL import Image
import plotly.express as px
from sklearn.model_selection import train_test_split
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, WeightedRandomSampler, random_split
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import shutil

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 1. Define paths
zip_path_in_drive = '/content/drive/MyDrive/plantvillage dataset'
local_zip_path = '/content/PlantVillage_local'

# 2. Copy data from Drive to local runtime (Much faster IO)
# Check if the destination directory already exists to prevent an error with shutil.copytree
if not os.path.exists(local_zip_path):
    print(f"Copying data from Drive ({zip_path_in_drive}) to local VM ({local_zip_path})...")
    shutil.copytree(zip_path_in_drive, local_zip_path)
    print("Copy complete.")
else:
    print(f"Local dataset directory '{local_zip_path}' already exists. Skipping copy.")

data_dir = f"{local_zip_path}/color"

In [None]:
class_names = os.listdir(data_dir)
class_counts = {}
for class_name in class_names:
  class_path = os.path.join(data_dir, class_name)
  if os.path.isdir(class_path):
    class_counts[class_name] = len(os.listdir(class_path))


df = pd.DataFrame(list(class_counts.items()), columns=['Class', 'Count'])
plt.figure(figsize=(20, 8))
sns.barplot(x="Class", y="Count", data=df)
plt.title('Class Distribution')
plt.xticks(rotation=90)
plt.show()

print(f'Total number of images:{df.Count.sum()}')
print(f'Min number of images in a Class:{df.Count.min()}')
print(f'Max number of images in a Class:{df.Count.max()}')

In [None]:
def show_random_images(data_dir, num_images=12):
  class_names = os.listdir(data_dir)
  for class_name in range(num_images):
    random_class = random.choice(class_names)
    random_class_path = os.path.join(data_dir, random_class)
    random_image = random.choice(os.listdir(random_class_path))
    random_image_path = os.path.join(random_class_path, random_image)
    image = Image.open(random_image_path)
    plt.imshow(image)
    plt.subplot(3, 4, class_name+1)
    plt.title(f'Class: {random_class.split('___')[0]}')
    plt.axis('off')
    plt.show()

show_random_images(data_dir)



In [None]:
widths = []
heights = []
corrupt = []

for dirpath, dirnames, filenames in os.walk(data_dir):
  for filename in filenames:
    if filename.endswith(('.jpg', '.png', 'JPG', 'jpeg', 'JPEG')):
      try:
        image_path = os.path.join(dirpath, filename)
        with Image.open(image_path) as img:
          widths.append(img.width)
          heights.append(img.height)
          img.verify()
      except Exception as e:
        corrupt.append(image_path)
        print(f'Error processing {image_path}: {e}')

print(f'Average width: {sum(widths)/len(widths)}')
print(f'Average Heigth: {sum(heights)/len(heights)}')
print(f'Total number of corrupt images: {len(corrupt)}')

plt.figure(figsize=(8,6))
plt.scatter(widths, heights, alpha=0.5)
plt.title("Image Dimensions Scatter Plot")
plt.xlabel("Width")
plt.ylabel("Height")
plt.show()

In [None]:
def get_mean_std(data_dir, sample_size = 1000):
  all_files = []
  for dirpath, dirnames, filenames in os.walk(data_dir):
    for filename in filenames:
      if filename.endswith(('.jpg', '.png', 'JPG', 'jpeg', 'JPEG')):
        all_files.append(os.path.join(dirpath, filename))

  random_files = random.sample(all_files, sample_size)
  pixel_values = []
  for f in random_files:
      img = Image.open(f).convert('RGB')
      img = img.resize((224, 224)) # Resize to what model sees
      img_array = np.array(img) / 255.0 # Normalize to 0-1
      pixel_values.append(img_array)

  mean = np.mean(pixel_values, axis=(0, 1, 2))
  std = np.std(pixel_values, axis=(0, 1, 2))

  return mean, std

custom_mean, custom_std = get_mean_std(data_dir)
print(f"Custom Mean: {custom_mean}")
print(f"Custom Std: {custom_std}")


In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomAffine(
        degrees=40,          # rotation_range=40
        translate=(0.2, 0.2),# width/height_shift_range=0.2
        shear=0.2,           # shear_range=0.2
        scale=(0.8, 1.2)     # zoom_range=0.2 (0.8 to 1.2)
    ),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


In [None]:
BATCH_SIZE = 32
dataset = datasets.ImageFolder(root=data_dir)
targets = dataset.targets

train_idx, val_idx = train_test_split(
    np.arange(len(targets)),
    test_size=0.2,
    shuffle=True,
    stratify=targets
)

class_counts = np.bincount(targets)
class_weights = 1. / class_counts
sample_weights = class_weights[targets]
sample_weights = torch.from_numpy(sample_weights)
train_weights = sample_weights[train_idx]

train_sampler = WeightedRandomSampler(
    weights=train_weights,
    replacement=True,
    num_samples=len(train_idx)
)

val_sampler = SubsetRandomSampler(val_idx)

train_dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)
test_dataset = datasets.ImageFolder(root=data_dir, transform=val_transforms)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler,num_workers=2, pin_memory=True )
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, sampler=val_sampler, num_workers=2, pin_memory=True)

print(f"Loaders ready.")
print(f"Training on {len(train_idx)} images.")
print(f"Validating on {len(val_idx)} images.")
print(f"Classes: {len(dataset.classes)}")
print(f"Classes: {dataset.classes}")

## Training Model on Custom CNN

In [None]:
class PlantDiseaseCNN(nn.Module):
    def __init__(self, num_classes=38):
        super(PlantDiseaseCNN, self).__init__()

        # Block 1
        # TF: Conv2D(32, (3,3)) -> Conv2D(64, (3,3)) -> MaxPool
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32), # Batch Norm helps convergence (optional but recommended)
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2) # Image becomes 112x112
        )

        # Block 2
        # TF: Conv2D(64, (3,3)) -> Conv2D(64, (3,3)) -> MaxPool
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2) # Image becomes 56x56
        )

        # Block 3
        # TF: Conv2D(128, (3,3)) -> Conv2D(128, (3,3)) -> MaxPool
        self.block3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(kernel_size=2, stride=2), # Image becomes 28x28
            nn.Dropout(0.2)
        )

        # Classifier Head
        self.flatten = nn.Flatten()

        # Math: 128 channels * 28 width * 28 height = 100,352 inputs
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 28 * 28, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.flatten(x)
        x = self.fc_layers(x)
        return x

In [None]:
devcice = "cuda" if torch.cuda.is_available() else "cpu"
model = PlantDiseaseCNN(num_classes=38).to(device)

# 2. Setup Optimizer & Scheduler
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=2)
criterion = nn.CrossEntropyLoss()

# 3. Training Loop
NUM_EPOCHS = 20

In [None]:
for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0
    train_correct = 0
    total_train = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        train_correct += (preds == labels).sum().item()
        total_train += labels.size(0)

    avg_train_loss = train_loss / len(train_loader)
    train_acc = train_correct / total_train

    model.eval()
    val_loss = 0.0
    val_correct = 0
    total_val = 0

    with torch.inference_mode():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            val_correct += (preds == labels).sum().item()
            total_val += labels.size(0)

    avg_val_loss = val_loss / len(test_loader)
    val_acc = val_correct / total_val

    scheduler.step(avg_val_loss)

    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | "
          f"Train Loss: {avg_train_loss:.4f} Acc: {train_acc:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} Acc: {val_acc:.4f}")


In [None]:
save_path = "/content/drive/MyDrive/modelss/plant_disease_custom_cnn.pth"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(model.state_dict(), save_path)

## Training Model on ResNet

In [None]:
BATCH_SIZE = 32

norm_mean = [0.485, 0.456, 0.406]
norm_std = [0.229, 0.224, 0.225]

train_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=norm_mean, std=norm_std),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10)
])

val_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=norm_mean, std=norm_std)
])


In [None]:
dataset = datasets.ImageFolder(root=data_dir)
targets = dataset.targets

train_idx, val_idx = train_test_split(
    np.arange(len(targets)),
    test_size=0.2,
    shuffle=True,
    stratify=targets
)

class_counts = np.bincount(targets)
class_weights = 1. / class_counts
sample_weights = class_weights[targets]
sample_weights = torch.from_numpy(sample_weights)
train_weights = sample_weights[train_idx]

train_sampler = WeightedRandomSampler(
    weights=train_weights,
    replacement=True,
    num_samples=len(train_idx)
)

val_sampler = SubsetRandomSampler(val_idx)

train_dataset = datasets.ImageFolder(root=data_dir, transform=train_transforms)
test_dataset = datasets.ImageFolder(root=data_dir, transform=val_transforms)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=train_sampler)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, sampler=val_sampler)

print(f"Loaders ready.")
print(f"Training on {len(train_idx)} images.")
print(f"Validating on {len(val_idx)} images.")
print(f"Classes: {len(dataset.classes)}")

In [None]:
weights = torchvision.models.ResNet18_Weights.DEFAULT

In [None]:
model = torchvision.models.resnet18(weights=weights)
for param in model.parameters():
  param.requires_grad = False

model.fc = nn.Linear(in_features=model.fc.in_features, out_features=38, bias=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.01)

In [None]:
NUM_EPOCHS = 5

for epoch in range(NUM_EPOCHS):
  train_loss = 0
  train_correct = 0 # To track accuracy
  total_samples = 0
  model.train()
  for images, labels in train_loader:
    images, labels = images.to(device), labels.to(device)
    y_pred = model(images)
    loss = loss_fn(y_pred, labels)
    train_loss+=loss.item()
    _, preds = torch.max(y_pred, dim=1)
    train_correct += (preds == labels).sum().item()
    total_samples += labels.size(0)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    # Calculate average loss/acc for the epoch
    avg_train_loss = train_loss / len(train_loader)
    avg_train_acc = train_correct / total_samples
  model.eval()
  with torch.inference_mode():
    test_loss = 0.0
    test_correct = 0
    total_samples = 0
    model.eval()
    for images, labels in test_loader:
      images, labels = images.to(device), labels.to(device)
      test_pred = model(images)
      loss=loss_fn(test_pred, labels)
      test_loss+=loss.item()
      _, preds = torch.max(test_pred, dim=1)
      test_correct += (preds == labels).sum().item()
      total_samples += labels.size(0)
  avg_test_loss = test_loss / len(test_loader)
  avg_test_acc = test_correct / total_samples
  print(f"Epoch [{epoch+1}/{NUM_EPOCHS}] "
          f"Train Loss: {avg_train_loss:.4f} | Train Acc: {avg_train_acc*100:.2f}% | "
          f"Val Loss: {avg_test_loss:.4f} | Val Acc: {avg_test_acc*100:.2f}%")

torch.save(model.state_dict(), "plant_disease_model.pth")
print("Model saved successfully.")

In [None]:
save_path = "/content/drive/MyDrive/modelss/plant_disease_resnet.pth"
os.makedirs(os.path.dirname(save_path), exist_ok=True)
torch.save(model.state_dict(), save_path)