### Import Dependencies

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
import torch
from torchvision import datasets, transforms, models  # datsets  , transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
import torch.nn.functional as F
from datetime import datetime

In [None]:
# Optional: Install tqdm for progress bars if not already installed
# !pip install tqdm
from tqdm import tqdm

In [None]:

%jupyter_black

In [None]:
# Check GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\nUsing device: {device}")

if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("GPU not available. Training will use CPU (slower).")

In [None]:
# Training Configuration
TESTING_MODE = True  # Set to False for full training
SUBSET_PERCENTAGE = 0.10  # Use 10% of dataset when testing
NUM_EPOCHS = 2 if TESTING_MODE else 5
BATCH_SIZE = 64
NUM_WORKERS = 4  # Parallel data loading (0 if you have issues on Windows)

print(f"Configuration:")
print(f"  Testing Mode: {TESTING_MODE}")
if TESTING_MODE:
    print(f"  Using {SUBSET_PERCENTAGE*100}% of dataset")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Batch Size: {BATCH_SIZE}")

### Configuration Settings

### Import Dataset

<b> Dataset Link (Plant Vliiage Dataset ):</b><br> <a href='https://data.mendeley.com/datasets/tywbtsjrjv/1'> https://data.mendeley.com/datasets/tywbtsjrjv/1 </a> 

In [None]:
transform = transforms.Compose(
    [transforms.Resize(255), transforms.CenterCrop(224), transforms.ToTensor()]
)

In [None]:
dataset = datasets.ImageFolder("Dataset", transform=transform)

In [None]:
# Apply subset for testing mode
if TESTING_MODE:
    subset_size = int(len(dataset) * SUBSET_PERCENTAGE)
    dataset = torch.utils.data.Subset(dataset, range(subset_size))
    print(f"Testing mode: Using {len(dataset)} images ({SUBSET_PERCENTAGE*100}% of full dataset)")
else:
    print(f"Full training mode: Using all {len(dataset)} images")

In [None]:
dataset

In [None]:
indices = list(range(len(dataset)))

In [None]:
split = int(np.floor(0.85 * len(dataset)))  # train_size

In [None]:
validation = int(np.floor(0.70 * split))  # validation

In [None]:
print(0, validation, split, len(dataset))

In [None]:
print(f"length of train size :{validation}")
print(f"length of validation size :{split - validation}")
print(f"length of test size :{len(dataset)-validation}")

In [None]:
np.random.shuffle(indices)

### Split into Train and Test

In [None]:
train_indices, validation_indices, test_indices = (
    indices[:validation],
    indices[validation:split],
    indices[split:],
)

In [None]:
train_sampler = SubsetRandomSampler(train_indices)
validation_sampler = SubsetRandomSampler(validation_indices)
test_sampler = SubsetRandomSampler(test_indices)

In [None]:
targets_size = len(dataset.class_to_idx)

### Model

<b>Convolution Aithmetic Equation : </b>(W - F + 2P) / S + 1 <br>
W = Input Size<br>
F = Filter Size<br>
P = Padding Size<br>
S = Stride <br>

### Transfer Learning

In [None]:
# model = models.vgg16(pretrained=True)

In [None]:
# for params in model.parameters():
#     params.requires_grad = False

In [None]:
# model

In [None]:
# n_features = model.classifier[0].in_features
# n_features

In [None]:
# model.classifier = nn.Sequential(
#     nn.Linear(n_features, 1024),
#     nn.ReLU(),
#     nn.Dropout(0.4),
#     nn.Linear(1024, targets_size),
# )

In [None]:
# model

### Original Modeling

In [None]:
class CNN(nn.Module):
    def __init__(self, K):
        super(CNN, self).__init__()
        self.conv_layers = nn.Sequential(
            # conv1
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2),
            # conv2
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2),
            # conv3
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
            # conv4
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2),
        )

        self.dense_layers = nn.Sequential(
            nn.Dropout(0.4),
            nn.Linear(50176, 1024),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(1024, K),
        )

    def forward(self, X):
        out = self.conv_layers(X)

        # Flatten
        out = out.view(-1, 50176)

        # Fully connected
        out = self.dense_layers(out)

        return out

In [None]:
model = CNN(targets_size)

In [None]:
model.to(device)

In [None]:
from torchsummary import summary

summary(model, (3, 224, 224))

In [None]:
criterion = nn.CrossEntropyLoss()  # this include softmax + cross entropy loss
optimizer = torch.optim.Adam(model.parameters())

### Batch Gradient Descent

In [None]:
def batch_gd(model, criterion, train_loader, test_laoder, epochs):
    train_losses = np.zeros(epochs)
    validation_losses = np.zeros(epochs)

    for e in range(epochs):
        t0 = datetime.now()
        train_loss = []
        
        # Add progress bar for training
        pbar = tqdm(train_loader, desc=f"Epoch {e+1}/{epochs} [Train]")
        for inputs, targets in pbar:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()

            output = model(inputs)

            loss = criterion(output, targets)

            train_loss.append(loss.item())  # torch to numpy world

            loss.backward()
            optimizer.step()
            
            # Update progress bar
            pbar.set_postfix({'loss': f'{loss.item():.3f}'})

        train_loss = np.mean(train_loss)

        validation_loss = []

        # Add progress bar for validation
        pbar_val = tqdm(validation_loader, desc=f"Epoch {e+1}/{epochs} [Val]")
        for inputs, targets in pbar_val:

            inputs, targets = inputs.to(device), targets.to(device)

            output = model(inputs)

            loss = criterion(output, targets)

            validation_loss.append(loss.item())  # torch to numpy world
            
            pbar_val.set_postfix({'loss': f'{loss.item():.3f}'})

        validation_loss = np.mean(validation_loss)

        train_losses[e] = train_loss
        validation_losses[e] = validation_loss

        dt = datetime.now() - t0

        print(
            f"Epoch : {e+1}/{epochs} Train_loss:{train_loss:.3f} Val_loss:{validation_loss:.3f} Duration:{dt}"
        )

    return train_losses, validation_losses

In [None]:
# Create data loaders with optimization settings
train_loader = torch.utils.data.DataLoader(
    dataset, batch_size=BATCH_SIZE, sampler=train_sampler,
    num_workers=NUM_WORKERS, pin_memory=True
)
test_loader = torch.utils.data.DataLoader(
    dataset, batch_size=BATCH_SIZE, sampler=test_sampler,
    num_workers=NUM_WORKERS, pin_memory=True
)
validation_loader = torch.utils.data.DataLoader(
    dataset, batch_size=BATCH_SIZE, sampler=validation_sampler,
    num_workers=NUM_WORKERS, pin_memory=True
)

print(f"Data loaders created:")
print(f"  Train batches: {len(train_loader)}")
print(f"  Validation batches: {len(validation_loader)}")
print(f"  Test batches: {len(test_loader)}")

In [None]:
print(f"\nStarting training for {NUM_EPOCHS} epochs...")
print(f"Using device: {device}\n")

train_losses, validation_losses = batch_gd(
    model, criterion, train_loader, validation_loader, NUM_EPOCHS
)

print("\n✓ Training completed!")

### Save the Model

In [None]:
# Save the trained model
model_filename = 'plant_disease_model_1.pt'
torch.save(model.state_dict(), model_filename)
print(f"✓ Model saved to: {model_filename}")

### Load Model

In [None]:
targets_size = 39
model = CNN(targets_size)
model.load_state_dict(torch.load("plant_disease_model_1_latest.pt"))
model.eval()

In [None]:
# %matplotlib notebook

### Plot the loss

In [None]:
plt.plot(train_losses , label = 'train_loss')
plt.plot(validation_losses , label = 'validation_loss')
plt.xlabel('No of Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

### Accuracy

In [None]:
def accuracy(loader):
    n_correct = 0
    n_total = 0

    for inputs, targets in loader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)

        _, predictions = torch.max(outputs, 1)

        n_correct += (predictions == targets).sum().item()
        n_total += targets.shape[0]

    acc = n_correct / n_total
    return acc

In [None]:
train_acc = accuracy(train_loader)
test_acc = accuracy(test_loader)
validation_acc = accuracy(validation_loader)

In [None]:
print(
    f"Train Accuracy : {train_acc}\nTest Accuracy : {test_acc}\nValidation Accuracy : {validation_acc}"
)

In [None]:
print("="*60)
print("TRAINING COMPLETE - SUMMARY")
print("="*60)
print(f"\nConfiguration:")
print(f"  Device: {device}")
print(f"  Testing Mode: {TESTING_MODE}")
print(f"  Epochs: {NUM_EPOCHS}")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"\nDataset:")
print(f"  Total samples: {len(dataset)}")
print(f"  Number of classes: {targets_size}")
print(f"  Train samples: {validation}")
print(f"  Validation samples: {split - validation}")
print(f"  Test samples: {len(dataset) - split}")
print(f"\nAccuracy:")
print(f"  Train Accuracy: {train_acc:.4f} ({train_acc*100:.2f}%)")
print(f"  Validation Accuracy: {validation_acc:.4f} ({validation_acc*100:.2f}%)")
print(f"  Test Accuracy: {test_acc:.4f} ({test_acc*100:.2f}%)")
print(f"\nModel saved to: plant_disease_model_1.pt")
print("="*60)
print("\nNext steps:")
if TESTING_MODE:
    print("  1. Set TESTING_MODE = False in the configuration cell")
    print("  2. Re-run all cells for full training")
    print("  3. Full training will take 4-7 hours on CPU or 15-40 min on GPU")
else:
    print("  ✓ Full training complete!")
    print("  1. Use the saved model for predictions")
    print("  2. Consider transfer learning with VGG16 for better accuracy")
print("="*60)

### Training Summary

In [None]:
def single_prediction(image_path):
    image = Image.open(image_path)
    image = image.resize((224, 224))
    input_data = TF.to_tensor(image)
    input_data = input_data.view((-1, 3, 224, 224))
    output = model(input_data)
    output = output.detach().numpy()
    index = np.argmax(output)
    print("Original : ", image_path[12:-4])
    pred_csv = data["disease_name"][index]
    print(pred_csv)