<a href="https://colab.research.google.com/github/SriGanesh737/PlantDiseaseClassification/blob/main/PlantDiseaseClassification%5BTransfer_Learning%5D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import csv
import random
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
from PIL import Image
import numpy as np
from tqdm import tqdm
from multiprocessing import Pool


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Function to get labels
def get_labels():
    subfolders = [f.name for f in os.scandir(images_folder) if f.is_dir()]
    labels = {label_name: label_index for label_index, label_name in enumerate(subfolders)}
    return labels

# Function to create CSV file for data paths
def create_data_paths_csv(labels: dict):
    data = []
    for label_name, label_index in labels.items():
        for f in os.scandir(os.path.join(images_folder, label_name)):
            data.append([f.path, label_index])

    with open("/content/drive/My Drive/data.csv", 'w', newline='') as csvfile:  # Change the path if required
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(["image_path", "label"])
        csv_writer.writerows(data)

    print("Csv file created successfully!!!")

In [None]:
images_folder = "/content/drive/My Drive/PlantVillage"


labels = get_labels()
if not os.path.exists("/content/drive/My Drive/data.csv"):
    create_data_paths_csv(labels)

data = pd.read_csv("/content/drive/My Drive/data.csv")
data.head()

Unnamed: 0,image_path,label
0,/content/drive/My Drive/PlantVillage/Pepper__b...,0
1,/content/drive/My Drive/PlantVillage/Pepper__b...,0
2,/content/drive/My Drive/PlantVillage/Pepper__b...,0
3,/content/drive/My Drive/PlantVillage/Pepper__b...,0
4,/content/drive/My Drive/PlantVillage/Pepper__b...,0


In [None]:
def read_image(image_path):
    try:
        with Image.open(image_path) as img:
            # Convert the image to numpy array and return
            img_array = np.array(img)
            return img_array
    except Exception as e:
        print(f"Error reading image {image_path}: {str(e)}")
        return None

def read_images_parallel(df):
    image_paths = df['image_path'].tolist()
    with Pool() as pool:
        images = list(tqdm(pool.imap(read_image, image_paths), total=len(image_paths), desc="Reading Images"))
    df['image'] = images
    return df




if not os.path.exists("/content/drive/My Drive/PlantLeafDiseaseDetection/data.pkl"):
  data = read_images_parallel(data)
  # Save DataFrame to a pickle
  data.to_pickle('/content/drive/My Drive/leaf_disease_dataset/data.pkl')

else:
  data = pd.read_pickle('/content/drive/My Drive/PlantLeafDiseaseDetection/data.pkl')

In [None]:
# Define dataset class
class PlantDiseaseDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data

        # Define transform
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        item = self.data.iloc[index]
        image = Image.fromarray(item["image"])

        # Convert RGBA images to RGB
        if image.mode == 'RGBA':
            image = image.convert('RGB')

        if self.transform:
            image = self.transform(image)

        label = item["label"]
        return image, label

In [None]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
dataset = PlantDiseaseDataset(data, transform=transform)

In [None]:
# Hyperparameters
num_classes = len(labels)
learning_rate = 0.001
num_epochs = 20
batch_size = 64

In [None]:
# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Define CNN model class with pretrained ResNet
class PlantDiseaseClassifier(nn.Module):
    def __init__(self, num_classes):
        super(PlantDiseaseClassifier, self).__init__()

        # Load pretrained ResNet model
        self.resnet = models.resnet18(pretrained=True)

        # Freeze pretrained layers
        for param in self.resnet.parameters():
            param.requires_grad = False

        # Replace the last fully connected layer to match the number of classes
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        return self.resnet(x)

In [None]:
# Initialize model
model = PlantDiseaseClassifier(num_classes)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Train the model with fine-tuning
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    # Use tqdm to add a progress bar
    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False)

    for images, labels in progress_bar:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Update the progress bar description with current loss
        progress_bar.set_postfix({'loss': running_loss / len(train_loader)})

    epoch_loss = running_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')





Epoch [1/20], Loss: 1.1007




Epoch [2/20], Loss: 0.3982




Epoch [3/20], Loss: 0.2945




Epoch [4/20], Loss: 0.2410




Epoch [5/20], Loss: 0.2188




Epoch [6/20], Loss: 0.1957




Epoch [7/20], Loss: 0.1759




Epoch [8/20], Loss: 0.1591




Epoch [9/20], Loss: 0.1566




Epoch [10/20], Loss: 0.1465




Epoch [11/20], Loss: 0.1426




Epoch [12/20], Loss: 0.1349




Epoch [13/20], Loss: 0.1323




Epoch [14/20], Loss: 0.1274




Epoch [15/20], Loss: 0.1201




Epoch [16/20], Loss: 0.1189




Epoch [17/20], Loss: 0.1206




Epoch [18/20], Loss: 0.1195




Epoch [19/20], Loss: 0.1175


                                                                        

Epoch [20/20], Loss: 0.1142




In [None]:
# Evaluate the fine-tuned model
model.eval()
test_acc = 0.0
test_loss = 0.0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        test_acc += (predicted == labels).sum().item()

test_acc = test_acc / len(test_dataset)
test_loss = test_loss / len(test_loader)
print(f'Test Accuracy: {test_acc:.4f}, Test Loss: {test_loss:.4f}')


Test Accuracy: 0.9626, Test Loss: 0.1166
