<a href="https://colab.research.google.com/github/CrMessiSuriJr/Plant_disease_detection/blob/main/AgriResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @markdown Installing required files/libraries and updating previously intsalled
!pip install captum lime jupyter jupyter_http_over_ws kaggle tensorboardX
!pip install --upgrade torch torchvision Pillow matplotlib scikit-image scikit-learn captum lime jupyter jupyter_http_over_ws kaggle tensorboardX

Collecting Pillow
  Using cached pillow-10.2.0-cp310-cp310-manylinux_2_28_x86_64.whl (4.5 MB)


In [None]:
# @markdown Importing libraries: torch, os, random, shutil, google.colab, PIL, matplotlib, Install&Import captum etc...
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision import datasets, models
import os
import random
import shutil
try:
    from google.colab import drive
except:
    pass
from PIL import Image
import matplotlib.pyplot as plt

import captum.attr as captum_attr
from captum.attr import GuidedGradCam

import numpy as np
from lime import lime_image
from lime.wrappers.scikit_image import SegmentationAlgorithm
from skimage.segmentation import mark_boundaries
from lime.lime_image import LimeImageExplainer

from sklearn.manifold import TSNE

from PIL import UnidentifiedImageError
import time

from tensorboardX import SummaryWriter

drive.mount('/content/drive')
!jupyter serverextension enable --py jupyter_http_over_ws
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Enabling: jupyter_http_over_ws
- Writing config: /root/.jupyter
    - Validating...
      jupyter_http_over_ws 0.0.7 [32mOK[0m


In [None]:
# Define data preprocessing and augmentation
dim = 224 # @param {type:"integer"}
crop_dim = 224 # @param {type:"integer"}
norm_mean = [0.0]*3
norm_std = [0.0]*3
norm_mean[0] = 0.485 # @param
norm_mean[1] = 0.456 # @param
norm_mean[2] = 0.406 # @param
norm_std[0] = 0.229 # @param
norm_std[1] = 0.224 # @param
norm_std[2] = 0.225 # @param
transform = transforms.Compose([
    transforms.Resize((dim, dim)),  # Resize images to a fixed size
    transforms.CenterCrop(crop_dim),     # Center crop to 224x224 pixels
    transforms.ToTensor(),          # Convert to PyTorch tensor
    transforms.Normalize(mean=norm_mean, std=norm_std)  # Normalize
])

In [None]:
# @markdown Define the dataset path
local = False # @param {type:"boolean"}
if not local: dataset_path = '/content/' # @param {type:"string"}
else: dataset_path = ".\\08112023\\"

# @markdown Define the ratio for the train-test split
split_ratio = 0.7  # 70% for training, 30% for testing

# @markdown Specify the path to your dataset on Google Drive
drive_dataset_path = "PlantVillage" # @param {type:"string"}
drive_dataset_path = dataset_path + drive_dataset_path + ("\\" if local else "/")


# @markdown Create train and test directories if they don't exist
base_dir = dataset_path # @param {type:"string"}

train_dir = base_dir + "train" # @markdown train_dir = base_dir + "\train"

test_dir = base_dir + "test" # @markdown test_dir = base_dir + "\test"

os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)


# @markdown Function to check if an image can be opened without errors
def is_valid_image(file_path):
    try:
        with Image.open(file_path) as img:
            img.verify()
        return True
    except Exception as e:
        return False


In [None]:
to_download_data = True # @param {type:"boolean"}

In [None]:
if to_download_data:
    # @markdown #Please set the path to your Kaggle API key from Google Drive
    kaggle_api_key_path = "/content/kaggle/kaggle.json" # @param {type:"string"}
    # @markdown ^^^The folder containing kaggle.json api file, in Google-drive for file persistence

    # @markdown Copy the Kaggle API key to the required directory
    !mkdir -p ~/.kaggle
    !cp "$kaggle_api_key_path" ~/.kaggle/
    !chmod 600 ~/.kaggle/kaggle.json

    # @markdown Importing kaggle for api access for dataset download
    try:
        import kaggle
    except Exception as e:
        print(e)

    # @markdown Replace 'your_dataset_folder' with the Kaggle dataset name
    dataset_name = 'emmarex/plantdisease' # @param {type:"string"}

    # @markdown Download the dataset using Kaggle API
    try:
        if (to_download_data): kaggle.api.dataset_download_files(dataset_name, unzip=True, path = dataset_path)  # Download and unzip the dataset to the './data' directory
    except Exception as e:
        print(e)

    class_folders = os.listdir(drive_dataset_path)
    for i in class_folders:
        if "desktop.ini" in i: class_folders.remove(i)

    # @markdown Iterate through each class folder
    for class_folder in class_folders:
        class_path = os.path.join(drive_dataset_path, class_folder)

        # @markdown List all the image files in the current class folder
        all_files = os.listdir(class_path)

        # @markdown Randomly shuffle the list of files
        random.shuffle(all_files)

        # @markdown Calculate the number of files for training and testing within the current class
        num_total_files = len(all_files)
        num_train_files = int(split_ratio * num_total_files)
        num_test_files = num_total_files - num_train_files

        # @markdown Create train and test subdirectories within the class folder
        class_train_dir = os.path.join(train_dir, class_folder)
        class_test_dir = os.path.join(test_dir, class_folder)
        os.makedirs(class_train_dir, exist_ok=True)
        os.makedirs(class_test_dir, exist_ok=True)

        # @markdown Move the first num_train_files files to the train subdirectory and the rest to the test subdirectory
        for i, file_name in enumerate(all_files):
            source_path = os.path.join(class_path, file_name)
            file_name = f"img_{i}.jpg"
            if i < num_train_files and is_valid_image(source_path):
                destination_path = os.path.join(class_train_dir, file_name)
            else:
                if is_valid_image(source_path):
                    destination_path = os.path.join(class_test_dir, file_name)
            shutil.copy(source_path, destination_path)
else:
    # @markdown List all the class folders in your dataset directory
    class_folders = os.listdir(drive_dataset_path)
    for i in class_folders:
        if "desktop.ini" in i: class_folders.remove(i)

'content-length'


FileNotFoundError: [Errno 2] No such file or directory: '/content/PlantVillage/'

In [None]:
# @markdown Iterate through your dataset and check each image
for class_folder in class_folders:
    class_path = os.path.join(drive_dataset_path, class_folder)
    for image_file in os.listdir(class_path):
        image_path = os.path.join(class_path, image_file)

        if not is_valid_image(image_path):
            print(f"Invalid image: {image_path}")
            print(f"\n\n\nDeleting file { {image_path}}...\n\n\n")
            !rm "$image_path"

# @markdown Create ImageFolder datasets for training and testing (excluding problematic images)
try:
    torch.cuda.empty_cache()
finally:
    pass
train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

# @markdown Define batch size and number of workers for data loading (adjust as needed)
batch_size = 128 # @param {type:"integer"}
num_workers = 2 # @param {type:"integer"}

# @markdown Create a DataLoader for the training dataset
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

# @markdown You can iterate through this train_loader in your training loop\
# @markdown &emsp;&emsp;Continue with your training loop and data loading using test_loader
for inputs,labels in test_loader: continue
# @markdown &emsp;&emsp;Continue with your training loop and data loading using train_loader
for inputs,labels in train_loader: continue
print(inputs.shape,labels.shape)

# @markdown Print the class names (based on folder names)
class_names = train_dataset.classes
print("Class names:", class_names)
print("Num of classes:",len(class_names))

# @markdown Print the number of samples in the training and testing sets
print("Number of training samples:", len(train_dataset))
print("Number of testing samples:", len(test_dataset))

In [None]:
print("\n".join([str(i)+":"+str(j) for i,j in enumerate(class_folders)]))

In [None]:
# @markdown (DEPRECATED) error causing file removal
img_num = 221  # @param {type:"integer"}
train_folder = True # @param {type:"boolean"}
folder_index = 1 # @param {type:"slider", min:0, max:14, step:1}
file_name = base_dir + ("train/" if train_folder else "test/") + class_folders[folder_index] +"/img_"+str(img_num)+".jpg"
to_delete = True # @param {type:"boolean"}
if (to_delete):
    !rm "$file_name"

In [None]:
# Define the  Custom ResNet model
class CustomResNetlayer(nn.Module):
    def __init__(self, num_classes, inner_dim = 32):
        super(CustomResNetlayer, self).__init__()

        Activation = nn.Sigmoid

        self.feature_head_l1 = nn.Sequential(
            nn.ConvTranspose2d(2048, 2048, kernel_size=3),
            nn.BatchNorm2d(2048, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            nn.Conv2d(2048, inner_dim, kernel_size=3),
            nn.BatchNorm2d(inner_dim, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            Activation(),
        )
        self.feature_head_l2 = nn.Sequential(
            nn.ConvTranspose2d(inner_dim, inner_dim, kernel_size=3),
            nn.BatchNorm2d(inner_dim, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            nn.Conv2d( inner_dim, inner_dim, kernel_size=3),
            nn.BatchNorm2d(inner_dim, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            Activation(),
        )
        self.feature_head_l3 = nn.Sequential(
            nn.ConvTranspose2d(inner_dim, 2048, kernel_size=3),
            nn.BatchNorm2d(2048, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            nn.Conv2d( 2048, 2048, kernel_size=3),
            nn.BatchNorm2d(2048, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),

            Activation(),
        )

        self.residual_fc_l12 = nn.Sequential(
            nn.Conv2d( inner_dim+2048, inner_dim, kernel_size=1),
            nn.BatchNorm2d(inner_dim, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),
            Activation(),
        )
        self.residual_fc_l2 = nn.Sequential(
            nn.Conv2d( inner_dim*2, inner_dim, kernel_size=1),
            nn.BatchNorm2d(inner_dim, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),
            Activation(),
        )
        self.residual_fc_l23 = nn.Sequential(
            nn.Conv2d( inner_dim+2048, 2048, kernel_size=1),
            nn.BatchNorm2d(2048, eps = 1e-05, momentum = 0.1, affine = True, track_running_stats = True),
            Activation(),
        )

    def forward(self, x):

        input = x

        y = self.feature_head_l1(x)
        y = torch.cat((x,y),dim=1)
        x = self.residual_fc_l12(y)

        y = self.feature_head_l2(x)
        y = torch.cat((x,y),dim=1)
        x = self.residual_fc_l2(y)

        y = self.feature_head_l3(x)
        y = torch.cat((x,y),dim=1)
        x = self.residual_fc_l23(y)

        return x + input

In [None]:
# @markdown empty cuda cache through torch
if torch.cuda.is_available():
    try:
        torch.cuda.empty_cache()
    finally:
        pass

In [None]:
num_classes = len(train_dataset.classes)
inner_dim = 16 # @param {type:"integer"}

# Setup an instance of your CustomResNet model

# Load the ResNet architecture
try: del model, optimizer, scheduler
except: pass
finally: model = models.resnet50(pretrained=True)

# Replace the last layer of the ResNet model
model.avgpool = nn.Sequential(
                CustomResNetlayer(num_classes,inner_dim),
                model.avgpool,
                )
model.fc = nn.Sequential(
            model.fc,
            nn.Sigmoid(),
            nn.Linear( model.fc.out_features, num_classes),
        )

for param in model.parameters():
    param.requires_grad = False
for param in model.avgpool.parameters():
    param.requires_grad = True
for param in model.fc.parameters():
    param.requires_grad = True

print(model)

# Move the model to the specified device (GPU or CPU)
model.to(device)

# Define your loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()


In [None]:
try: del scheduler, optimizer
except: pass
lr = 0.003 # @param {type:"number"}
weight_decay = 0.001 # @param {type:"number"}
optimizer = optim.AdamW(model.parameters(), lr = lr, betas = (0.9, 0.999), eps = 1e-08, weight_decay = weight_decay)


In [None]:
writer = SummaryWriter()
writer.add_graph(model, input_to_model = inputs.to(device))

%load_ext tensorboard
# export scalar data to JSON for external processing
writer.export_scalars_to_json("./runs/all_scalars.json")
writer.close()

In [None]:
%reload_ext tensorboard
%tensorboard --logdir /content/runs

In [None]:
# @markdown Training loop
num_epochs = 20 # @param {type:"integer"}
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, num_epochs, eta_min = 5e-6)

model.train()
for param in model.parameters():
    param.requires_grad = True
for param in model.avgpool.parameters():
    param.requires_grad = True
for param in model.fc.parameters():
    param.requires_grad = True

t0 = time.time()
for epoch in range(num_epochs):
    running_loss = 0.0
    t1 = time.time()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    scheduler.step()
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {(running_loss / len(train_loader)):.5f}, Time taken: {(time.time()-t1):.3f}s')
print(f"Total time taken: {(time.time()-t0):.3f}s")

In [None]:
# @markdown Evaluation
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy on test set: {100 * correct / total}%')

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# @markdown Get true labels and predicted labels
true_labels = []
predicted_labels = []

model.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())

# @markdown Compute the confusion matrix
cm = confusion_matrix(true_labels, predicted_labels, labels=range(num_classes))

# @markdown Plot the confusion matrix
plt.figure(figsize=(10, 8))
ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=train_dataset.classes).plot(cmap='viridis', values_format='d')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# @markdown Load the test dataset
reload_test_data = False # @param {type:"boolean"}
if (reload_test_data):
    test_dataset = datasets.ImageFolder(test_dir, transform=transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]))

# @markdown Define a function to compute GradCAM
def compute_gradcam(input_image, target_class):
    model.eval()
    input_image = input_image.to(device)
    target_class = torch.tensor(target_class).to(device)

    # @markdown Create a GradCAM object
    gradcam = captum_attr.GuidedGradCam(model, model.avgpool[0].residual_fc_l23[0])

    # @markdown Compute attribution scores
    attribution = gradcam.attribute(input_image, target=target_class)

    # @markdown Detach the attribution tensor from the computation graph
    attribution = attribution.detach()

    return attribution

# @markdown Choose an image and target class index for GradCAM visualization
image_no_for_input = 420 # @param {type:"integer"}
image, label = test_dataset[image_no_for_input]

# @markdown Transpose the image to [channels, height, width]
#image = image.permute(1, 2, 0)

# @markdown Compute GradCAM
attribution = compute_gradcam(image.unsqueeze(0), label)
attribution = (attribution - attribution.min())/(attribution.max() - attribution.min())

def norm_(img):
    out = (img - img.min())/(img.max() - img.min())
    return out

# @markdown Visualize the GradCAM heatmap
plt.figure(figsize=(8, 8))
plt.imshow(norm_(attribution).squeeze(0).permute(1, 2, 0).cpu(), cmap='viridis')
plt.title('GradCAM Heatmap')
plt.show()

In [None]:
# @markdown Define a function to create a Lime explainer and explain predictions
"""
# @markdown &emsp;Now we are ready to define classification function that Lime needs.\
# @markdown &emsp;The input to this function is numpy array of images where each image is ndarray \
# @markdown &emsp;of shape (channel, height, width). The output is numpy aaray of shape \
# @markdown &emsp;(image index, classes) where each value in array should be probability for that \
# @markdown &emsp;image, class combination.
"""
preprocess_transform = transforms.Compose([
    transforms.Resize((dim, dim)),  # Resize images to a fixed size
    transforms.CenterCrop(crop_dim),     # Center crop to crop_dim * crop_dim pixels
    transforms.Normalize(mean=norm_mean, std=norm_std)  # Normalize
])
def batch_predict(images):
    model.eval()
    if len(images.shape) > 3: batch = torch.stack([preprocess_transform(torch.from_numpy(i).permute(2, 0, 1)) for i in images], dim=0)
    else: batch = preprocess_transform(torch.from_numpy(i)).permute(2, 0, 1).unsqueeze(0)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    batch = batch.to(device)

    logits = model(batch)
    probs = nn.Softmax(dim=1)(logits)
    return probs.detach().cpu().numpy()

def explain_lime(image, model, num_classes):
    explainer = lime_image.LimeImageExplainer()
    explanation = explainer.explain_instance(image, model, top_labels = num_classes, num_samples=4096)
    return explanation

# @markdown Choose an image to explain
image_no_for_input = 420 # @param {type:"integer"}
image, label = test_dataset[image_no_for_input]
explanation = explain_lime(image.permute(1, 2, 0).numpy(), batch_predict, num_classes)

# @markdown Visualize the Lime explanation
temp, mask = explanation.get_image_and_mask(
                                            label,
                                            positive_only=False,
                                            num_features=5,
                                            hide_rest=False
                                            )

plt.imshow(mark_boundaries(norm_(temp) / 2 + 0.5, mask), cmap='viridis')
plt.title('LIME Explanation')
plt.show()


In [None]:

plt.imshow(norm_(test_dataset[420][0]).permute(1, 2, 0))
plt.show()

In [None]:
# @markdown Get feature representations for a subset of your data
num_samples = 8192  # @param {type:"integer"}
features = []
labels = []

model.eval()
with torch.no_grad():
    for inputs, batch_labels in test_loader:
        inputs = inputs.to(device)
        batch_labels = batch_labels.to(device)
        outputs = model(inputs)
        features.extend(outputs.cpu().numpy())
        labels.extend(batch_labels.cpu().numpy())

features = features[:num_samples]
labels = labels[:num_samples]

# @markdown Perform t-SNE
feat = np.stack(features)
tsne = TSNE(n_components=2, random_state=42)
tsne_result = tsne.fit_transform(feat)

# @markdown Visualize t-SNE plot
plt.figure(figsize=(8, 6))
plt.scatter(tsne_result[:, 0], tsne_result[:, 1], c=labels, cmap='viridis', alpha=0.7)
plt.colorbar()
plt.title('t-SNE Plot')
plt.show()
