<a href="https://colab.research.google.com/github/ayra-13/LeafCure/blob/main/FinalResNet3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [30]:
import os
import shutil
import random
from PIL import Image
import torch
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import torch.nn as nn
from torch.cuda.amp import GradScaler, autocast
from PIL import Image, ImageFile
import warnings

In [23]:
# Ensure PIL doesn't crash on truncated files
ImageFile.LOAD_TRUNCATED_IMAGES = True

# Custom loader to handle corrupted images
def safe_pil_loader(path):
    try:
        with open(path, 'rb') as f:
            img = Image.open(f)
            return img.convert('RGB')
    except OSError as e:
        warnings.warn(f"Skipping corrupted image: {path} ({e})")
        return None

In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [7]:
# Paths
# Replace with your main folder path
source_dir = "drive/MyDrive/Model/Rice_Leaf_AUG1"
destination_dir = "drive/MyDrive/Model/Dest_Rice_Leaf_AUG1"

In [6]:
os.listdir(source_dir)

['Bacterial Leaf Blight',
 'Healthy Rice Leaf',
 'Neck_Blast',
 'Narrow Brown Leaf Spot',
 'Leaf Blast',
 'Sheath Blight',
 'Brown Spot',
 'Leaf scald',
 'Rice Hispa']

In [None]:
# Ensure the destination directory exists
os.makedirs(destination_dir, exist_ok=True)

# Walk through the source directory
for root, dirs, files in os.walk(source_dir):
    # Get only the image files (modify extensions if needed)
    image_files = [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]

    if image_files:  # If there are images in the folder
        # Randomly select half of the images
        num_to_copy = len(image_files) // 2
        images_to_copy = random.sample(image_files, num_to_copy)

        # Create the corresponding subdirectory in the destination folder
        relative_path = os.path.relpath(root, source_dir)
        dest_subdir = os.path.join(destination_dir, relative_path)
        os.makedirs(dest_subdir, exist_ok=True)

        # Copy the selected images
        for image in images_to_copy:
            src_path = os.path.join(root, image)
            dest_path = os.path.join(dest_subdir, image)
            shutil.copy2(src_path, dest_path)  # Use copy2 to preserve metadata

print("Images have been copied successfully.")

Images have been copied successfully.


In [8]:
os.listdir(destination_dir)

['Bacterial Leaf Blight',
 'Healthy Rice Leaf',
 'Neck_Blast',
 'Narrow Brown Leaf Spot',
 'Leaf Blast',
 'Sheath Blight',
 'Brown Spot',
 'Leaf scald',
 'Rice Hispa']

In [None]:
# Paths
output_dir = "drive/MyDrive/Model/RiceDatatest"  # Replace with your output path

# Ratios for splitting
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

# Ensure output directories exist
for split in ['train', 'val', 'test']:
    os.makedirs(os.path.join(output_dir, split), exist_ok=True)

# Split the dataset
for root, dirs, files in os.walk(source_dir):
    # Assuming your source directory has subfolders representing classes
    for class_name in dirs: # Iterate through each class subfolder
        class_dir = os.path.join(root, class_name) # Get path to class subfolder
        image_files = [f for f in os.listdir(class_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]

        if image_files:  # If there are images in the class folder
            random.shuffle(image_files)
            num_images = len(image_files)
            train_end = int(train_ratio * num_images)
            val_end = train_end + int(val_ratio * num_images)

            splits = {
                'train': image_files[:train_end],
                'val': image_files[train_end:val_end],
                'test': image_files[val_end:]
            }

            for split, split_files in splits.items():
                split_dir = os.path.join(output_dir, split, class_name) # Create class subfolder in split directory
                os.makedirs(split_dir, exist_ok=True)
                for image in split_files:
                    shutil.copy2(os.path.join(class_dir, image), os.path.join(split_dir, image))

print("Dataset split into train, val, and test sets.")

Dataset split into train, val, and test sets.


In [9]:
data_dir = "drive/MyDrive/Model/RiceDatatest"

In [10]:
os.listdir(data_dir)

['train', 'val', 'test']

In [24]:
# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load dataset
train_data = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=transform,loader=safe_pil_loader)
val_data = datasets.ImageFolder(os.path.join(data_dir, 'val'), transform=transform,loader=safe_pil_loader)
test_data = datasets.ImageFolder(os.path.join(data_dir, 'test'), transform=transform,loader=safe_pil_loader)

# Dataloaders
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

# Load ResNet50
model = models.resnet50(pretrained=True)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, len(train_data.classes))  # Adjust output for the number of classes
model = model.to(device)  # Move the model to the correct device

# Define optimizer, loss, and scheduler
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
scheduler = StepLR(optimizer, step_size=7, gamma=0.1)
scaler = GradScaler()  # For mixed precision training

  scaler = GradScaler()  # For mixed precision training


In [21]:
# Training function
def train_one_epoch(model, train_loader, criterion, optimizer, scaler, device, epoch):
    model.train()
    train_loss = 0

    print(f"\nEpoch {epoch+1} - Training")
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        # Move inputs and labels to the device
        inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        # Reset gradients
        optimizer.zero_grad()

        # Forward pass with mixed precision
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        # Backward pass and optimizer step
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Accumulate training loss
        train_loss += loss.item()

        # Log progress every 50 batches or at the end
        if (batch_idx + 1) % 50 == 0 or (batch_idx + 1) == len(train_loader):
            print(f"  Batch {batch_idx+1}/{len(train_loader)}: Loss {loss.item():.4f}")

    avg_train_loss = train_loss / len(train_loader)
    print(f"Training Complete: Average Loss: {avg_train_loss:.4f}")
    return avg_train_loss


# Validation function
def validate_one_epoch(model, val_loader, criterion, device, epoch):
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    print(f"\nEpoch {epoch+1} - Validation")
    with torch.no_grad():
        for inputs, labels in val_loader:
            # Move inputs and labels to the device
            inputs, labels = inputs.to(device, non_blocking=True), labels.to(device, non_blocking=True)

            # Forward pass with mixed precision
            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            # Accumulate validation loss
            val_loss += loss.item()

            # Calculate accuracy
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = correct / total
    print(f"Validation Complete: Average Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.4f}")
    return avg_val_loss, val_accuracy


In [25]:
# Main training loop
epochs = 20
best_val_accuracy = 0

print("Starting training...")
for epoch in range(epochs):
    train_loss = train_one_epoch(model, train_loader, criterion, optimizer, scaler, device, epoch)
    val_loss, val_accuracy = validate_one_epoch(model, val_loader, criterion, device, epoch)

    # Save the best model
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), "best_model_resnet50.pth")
        print(f"  New best model saved with Accuracy: {best_val_accuracy:.4f}")

    # Adjust learning rate
    scheduler.step()
    print(f"Learning rate adjusted to: {scheduler.get_last_lr()[0]:.6f}")

print("Training complete!")

Starting training...

Epoch 1 - Training


  with autocast():


  Batch 50/92: Loss 1.1654
  Batch 92/92: Loss 0.8409
Training Complete: Average Loss: 1.3229

Epoch 1 - Validation


  with autocast():


Validation Complete: Average Loss: 1.0422, Accuracy: 0.6151
  New best model saved with Accuracy: 0.6151
Learning rate adjusted to: 0.001000

Epoch 2 - Training
  Batch 50/92: Loss 0.8434
  Batch 92/92: Loss 1.0691
Training Complete: Average Loss: 0.9227

Epoch 2 - Validation
Validation Complete: Average Loss: 1.0244, Accuracy: 0.6295
  New best model saved with Accuracy: 0.6295
Learning rate adjusted to: 0.001000

Epoch 3 - Training
  Batch 50/92: Loss 0.6632
  Batch 92/92: Loss 1.0860
Training Complete: Average Loss: 0.7419

Epoch 3 - Validation
Validation Complete: Average Loss: 0.6992, Accuracy: 0.7518
  New best model saved with Accuracy: 0.7518
Learning rate adjusted to: 0.001000

Epoch 4 - Training
  Batch 50/92: Loss 0.4934
  Batch 92/92: Loss 0.4275
Training Complete: Average Loss: 0.5993

Epoch 4 - Validation
Validation Complete: Average Loss: 0.8259, Accuracy: 0.7110
Learning rate adjusted to: 0.001000

Epoch 5 - Training
  Batch 50/92: Loss 0.4721
  Batch 92/92: Loss 0.1908

In [29]:
# Load the best model
model.load_state_dict(torch.load("best_model_resnet50.pth"))
model.eval()  # Set the model to evaluation mode

correct = 0
total = 0
test_loss = 0

print("\nTesting the model on the test dataset...")
with torch.no_grad():  # No gradient calculations for inference
    for inputs, labels in test_loader:
        # Move inputs and labels to the same device as the model
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)  # Calculate loss for testing
        test_loss += loss.item()

        # Predictions
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

# Calculate final test accuracy and average loss
test_accuracy = correct / total
test_loss /= len(test_loader)

print(f"Test Complete: Average Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}")



Testing the model on the test dataset...


  model.load_state_dict(torch.load("best_model_resnet50.pth"))


Test Complete: Average Loss: 0.4126, Accuracy: 0.8915
