## Importing Needed Libraries

<p align="justify"> This section loads essential libraries for building and training the DCGAN and CNN pipelines. It includes PyTorch modules for deep learning, torchvision for image utilities, and other auxiliary tools.</p>

In [None]:
# Part 1: Data Preprocessing

import torch

import random

import numpy as np

from torch.utils.data import DataLoader, Subset, TensorDataset, random_split

from torchvision import datasets, transforms

# Part 2: Creating the DCGAN Generator and Discriminator Classes

import torch.nn as nn

# Part 3: Training a DCGAN for Each Underrepresented Class (Cordana, Healthy, Pestalotiopsis)

import shutil

from pathlib import Path

from numpy import cos, pi

import torch.optim as optim

from torchvision.utils import save_image

from tqdm import tqdm

import os

## Part 1: Data Preprocessing

<p align="justify">This step prepares image data in formats suitable for GAN generation and CNN classification, with augmentation strategies tailored to each model's needs.

<p align="justify">This code block defines key constants used throughout the pipeline: file paths, image sizes for GAN and CNN processing, and the target banana leaf classes.

In [2]:
# constants

RAW_DATA_DIR = "../training_data"
GAN_SIZE = (128, 128)
CNN_SIZE = (224, 224)
BANANA_CLASSES  = ["cordana", "healthy", "pestalotiopsis", "sigatoka"]

<p align="justify">The <code>set_seed</code> function enforces reproducible results by fixing random seeds across Python, NumPy, and PyTorch, and ensuring deterministic behavior on GPU.</p>

In [None]:
def set_seed(seed):
    # Set the seed for Python's built-in random module
    random.seed(seed)

    # Set the seed for NumPy's random number generator
    np.random.seed(seed)

    # Set the seed for PyTorch's CPU RNG
    torch.manual_seed(seed)

    # Set the seed for all CUDA devices (if using GPU)
    torch.cuda.manual_seed_all(seed)

    # Ensure reproducibility by forcing deterministic behavior in cuDNN
    torch.backends.cudnn.deterministic = True

    # Disable benchmark mode to avoid non-deterministic algo selection
    torch.backends.cudnn.benchmark = False

<p align="justify">Each model in the pipeline expects inputs in a specific format, so dedicated transform pipelines are defined:

* <p align="justify"><code>transform_gan_b</code> resizes images and normalizes pixel values to [-1, 1], as required by DCGANs, and is used for training the base GAN.
* <p align="justify"><code>transform_gan_p</code> adds stronger augmentations, including flips, color jitter, and affine transforms, to improve diversity in class-specific DCGAN training.
* <p align="justify"><code>transform_cnn</code> resizes images to 224×224 and converts them to tensors, preparing them for feature extraction and classification with the CNN.

In [None]:
transform_gan_b = transforms.Compose([
    transforms.Resize(GAN_SIZE), # Resize for DCGAN
    transforms.ToTensor(),       # To tensor
    transforms.Normalize(
        [0.5, 0.5, 0.5], 
        [0.5, 0.5, 0.5],
    )  # Normalize to [-1, 1] for DCGAN
])

transform_gan_p = transforms.Compose([
    transforms.RandomResizedCrop(GAN_SIZE, scale = (0.9, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomApply([
        transforms.ColorJitter(
            brightness = 0.2, 
            contrast   = 0.2, 
            saturation = 0.2, 
            hue        = 0.05,
        ),
    ], p = 0.7),
    transforms.RandomApply([
        transforms.RandomAffine(
            degrees   = 10, 
            translate = (0.1, 0.1), 
            scale     = (0.9, 1.0), 
        ),
    ], p = 0.7),
])

transform_cnn = transforms.Compose([
    transforms.Resize(CNN_SIZE), # Resize for CNN
    transforms.ToTensor(),       # To tensor
])

<p align="justify">These helper functions prepare image batches tailored to the training objectives:

* <p align="justify"><code>load_gan_data(...)</code> loads images for DCGAN training. If a <code>target_class</code> is specified, it filters that class and applies heavy augmentations to generate multiple variants per image, which helps address class imbalance during synthetic generation.
* <p align="justify"><code>load_cnn_data(...)</code> prepares the dataset for CNN classification using simpler transforms that standardize size and format without augmentations.

<p align="justify">Both return PyTorch DataLoaders.

In [5]:
def load_gan_data(batch_size = 32, workers = 4, target_class = None, num_variants = 10, seed = 42, directory = RAW_DATA_DIR):

    generator = torch.Generator().manual_seed(seed)

    # Load full dataset with base GAN transformations
    dataset_gan = datasets.ImageFolder(root = directory, transform = transform_gan_b)

    if target_class:
        # Get class index from the class name
        class_index = dataset_gan.class_to_idx[target_class]

        # Filter indices where target matches
        indices = [i for i, (_, label) in enumerate(dataset_gan.samples) if label == class_index]

        # Wrap in a Subset
        dataset_gan = Subset(dataset_gan, indices)

        # Create a list to store augmented images
        augmented_images = []

        rng = random.Random(seed)

        # Apply augmentations to each image in the loaded dataset
        for i in range(len(dataset_gan)):
            image, label = dataset_gan[i]

            # Generate num_variants augmented versions of image
            for _ in range(num_variants):
                torch.manual_seed(rng.randint(0, 999999))
        
                augmented_image = transform_gan_p(image)

                augmented_images.append((augmented_image, label))

        # Create new dataset with augmented images
        final_dataset = torch.utils.data.TensorDataset(
            torch.stack([image for image, _ in augmented_images]),  # Stack all augmented images
            torch.tensor([label for _, label in augmented_images])  # Stack all labels
        )
    
    else:
        final_dataset = dataset_gan

    # Create DataLoader for the GAN data
    dataloader_gan = DataLoader(final_dataset, batch_size = batch_size, shuffle = True, num_workers = workers, generator = generator)

    return dataloader_gan

def load_cnn_data(batch_size = 32, workers = 4):
    # Load dataset with CNN transformations
    dataset_cnn = datasets.ImageFolder(root=RAW_DATA_DIR, transform = transform_cnn)
    
    # Create DataLoader for the CNN data
    dataloader_cnn = DataLoader(dataset_cnn, batch_size=batch_size, shuffle = True, num_workers = workers)

    return dataloader_cnn

## Part 2: Creating the DCGAN Generator and Discriminator Classes

<p align="justify">This part implements the core components of the DCGAN architecture. The Generator learns to produce banana leaf images from random noise, while the Discriminator distinguishes between real and synthetic samples, forming the adversarial training loop.</p>

<p align="justify">The code block below defines all hyperparameters and training settings for consistent execution across GAN and CNN components. It also sets device preferences and ensures reproducibility through a fixed seed.

In [None]:
# constants

SEED_NUM = 42              # Seed number for reproducibility

BATCH_SIZE = 128           # Number of images per training batch

INPUT_DIMENSION = 100      # Dimensionality of the generator input

NC = 3                     # Number of channels in the training images

NGF = 64                   # Base number of feature maps in the Generator

NDF = 64                   # Base number of feature maps in the Discriminator

EPOCHS = 200               # Number of training epochs

CHECKPOINT = 10            # Checkpoint number for model saving

LEARNING_RATE_G = 0.0002   # Learning rate for the Generator

LEARNING_RATE_D = 0.0001   # Learning rate for the Discriminator

LEARNING_RATE_CNN = 0.0001 # Learning rate for the CNN

BETA1 = 0.5                # Beta1 value for the Adam optimizer to help stabilize DCGAN training

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Use GPU if available

NGPU = 1  # Number of GPUs to use (0 means CPU only)

<p align="justify">The Generator class defines a transposed convolutional neural network that gradually upsamples a 100-dimensional latent vector into a 128×128 RGB image. It uses a series of ConvTranspose2d layers paired with InstanceNorm and ReLU activations, capped with a Tanh function to output pixel values scaled to [-1, 1]. Dropout is included at each block to promote regularization and prevent overfitting, which is useful in settings with limited real training data.

In [7]:
# Generator

class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()

        self.ngpu = ngpu

        # Generator network composed of a stack of transposed conv blocks
        self.main = nn.Sequential(
            self._block(INPUT_DIMENSION, NGF * 16, 4, 1, 0, bias = False),  # First layer: latent vector -> feature map
            self._block(NGF * 16, NGF * 8, 4, 2, 1, bias = False),          # Upsample to 8 x 8
            self._block(NGF * 8, NGF * 4, 4, 2, 1, bias = False),           # Upsample to 16 x 16
            self._block(NGF * 4, NGF * 2, 4, 2, 1, bias = False),           # Upsample to 32 x 32
            self._block(NGF * 2, NGF, 4, 2, 1, bias = False),               # Upsample to 64 x 64

            nn.ConvTranspose2d(NGF, NC, 4, 2, 1, bias = False),             # Final upsample to 128 x 128 with RGB output
            nn.Tanh()                                                       # Output pixel values in [-1, 1]
        )

    # Helper function to define a generator block:

    # ConvTranspose2d -> InstanceNorm2d -> ReLU -> Dropout

    def _block(self, i_channels, o_channels, kernel_size, stride, padding, bias):
        return nn.Sequential(
            nn.ConvTranspose2d(
                i_channels, 
                o_channels, 
                kernel_size, 
                stride, 
                padding, 
                bias = bias),
            nn.InstanceNorm2d(o_channels, affine = True),
            nn.ReLU(True),
            nn.Dropout(0.3) # Dropout to help regularize on small data
        )

    def forward(self, input):
        return self.main(input)

<p align="justify">On the other hand, the Discriminator is a deep convolutional network designed to classify images as real or fake by progressively downsampling them. Each block applies standard convolution, optional InstanceNorm, and LeakyReLU, with dropout for robustness. Notably, the first layer skips normalization to preserve raw signal. A helper flag in <code>forward()</code> optionally returns intermediate features for CNN training later.

In [41]:
# Discriminator

class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()

        self.ngpu = ngpu

        # Discriminator network composed of downsampling conv blocks
        self.main = nn.Sequential(
            self._block(NC, NDF, 4, 2, 1, bias = False, use_instanceNorm2d = False), # First block: no InstanceNorm2d
            self._block(NDF, NDF *  2, 4, 2, 1, bias = False),                       # Downsample to 32 x 32
            self._block(NDF * 2, NDF *  4, 4, 2, 1, bias = False),                   # Downsample to 16 x 16
            self._block(NDF * 4, NDF *  8, 4, 2, 1, bias = False),                   # Downsample to 8 x 8
            self._block(NDF * 8, NDF * 16, 4, 2, 1, bias = False),                   # Downsample to 4 x 4

            nn.Conv2d(NDF * 16, 1, 4, 1, 0, bias = False),                           # Final layer: reduce to 1 x 1
        )

    # Helper function to define a discriminator block:

    # Conv2d -> (optional) InstanceNorm2d -> LeakyReLU

    def _block(self, i_channels, o_channels, kernel_size, stride, padding, bias, use_instanceNorm2d = True):
        layers = [nn.Conv2d(
            i_channels, 
            o_channels, 
            kernel_size, 
            stride, 
            padding, 
            bias = bias)]
        
        if use_instanceNorm2d:
            layers.append(nn.InstanceNorm2d(o_channels, affine = True))
        
        layers.append(nn.LeakyReLU(0.2, inplace = True))
        layers.append(nn.Dropout(0.3)) # Dropout to help regularize on small data

        return nn.Sequential(*layers)

    def forward(self, inp, return_features=False):
        x = inp

        for i, layer in enumerate(self.main):
            x = layer(x)
            if return_features and i == 4: 
                return x 

        return x            
        # return self.main(inp)

## Part 3: Training a DCGAN for Each Underrepresented Class (Cordana, Healthy, Pestalotiopsis)

<p align="justify"> In this stage, we train a dedicated DCGAN for each underrepresented class: Cordana, Healthy, and Pestalotiopsis. The training loop involves alternating between updating the Discriminator to better distinguish real from fake images and guiding the Generator to produce more realistic outputs. Over multiple epochs, this adversarial process helps each model learn the visual distribution of its respective class.

<p align="justify">The code block below is sourced from the official PyTorch documentation and follows the DCGAN paper’s recommended weight initialization scheme.

In [9]:
def initialize_weights(model):
    classname = model.__class__.__name__

    if classname.find("Conv") != -1:
        nn.init.normal_(model.weight.data, 0.0, 0.02)

    elif classname.find("InstanceNorm") != -1:
        nn.init.normal_(model.weight.data, 1.0, 0.02)
        nn.init.constant_(model.bias.data, 0)

<p align="justify">This function sets up the output directory structure for saving GAN-generated images during training. It ensures that each class except Sigatoka has a clean folder unless training is resumed, in which case existing outputs are preserved.

In [10]:
GAN_OUTPUT_DIRECTORY_TEST = "../model2/gan_test" # for debugging while training

def prepare_output_directory(resume = False):
    for cls in ["base"] + BANANA_CLASSES:
        if cls != "sigatoka":
            full_path = Path(GAN_OUTPUT_DIRECTORY_TEST) / cls

            # Only remove and recreate the directory if not resuming
            if not resume and full_path.exists():
                shutil.rmtree(full_path)

            full_path.mkdir(parents = True, exist_ok = True)

# prepare_output_directory(resume = True)

<p align="justify">This function is the core training loop for a DCGAN, designed to generate synthetic banana leaf images. It is flexible enough to handle training under three scenarios: (1) training a base DCGAN from scratch or a checkpoint, (2) initializing a class-specific DCGAN from the base model, and (3) resuming training from a class-specific checkpoint.

<p align="justify"> Some notes on the training loop setup:

* <p align="justify"><code>BCEWithLogitsLoss</code> is utilized for Generator and Discriminator losses.
* <p align="justify">The labels for real and fake images are smoothed slightly at 0.9 and 0.1, respectively, instead of 1 and 0, to regularize training and reduce overconfidence in the Discriminator.
* <p align="justify">Cosine annealing learning rate schedulers are used to gradually reduce the learning rate over time and help the model converge more smoothly.


<p align="justify">The training loop works as such:

### Discriminator Update

1. <p align="justify">Real images are passed through the Discriminator after adding decaying Gaussian noise, a form of instance noise that regularizes training and helps prevent overfitting.
2. <p align="justify">Fake images are generated using the Generator and also passed through the Discriminator.
3. <p align="justify">The Discriminator is trained to correctly distinguish between real and fake using the sum of the two BCE losses.
4. <p align="justify">Gradients for the Discriminator are clipped to stabilize learning.

### Generator Update

<p align="justify">The Generator is then updated with the goal of “fooling” the Discriminator. It backpropagates the error from the Discriminator’s output using the real label. (trying to make fake images look real)

In [11]:
def train_dcgan(target_class = None, resume = False, checkpoint_path = None, balanced_path = None):
    # Set random seed for reproducibility
    set_seed(SEED_NUM)

    # Load dataset and define save path
    if balanced_path:  
        save_path = "../model2/final_dcgan"

        dataloader = load_gan_data(batch_size = BATCH_SIZE, directory = balanced_path)

    else:
        save_path = GAN_OUTPUT_DIRECTORY_TEST

        if target_class:
            dataloader = load_gan_data(batch_size = BATCH_SIZE, target_class = target_class)

        else:
            dataloader = load_gan_data(batch_size = BATCH_SIZE)

    # Initialize Generator and Discriminator
    netG = Generator(ngpu = NGPU).to(DEVICE)
    netD = Discriminator(ngpu = NGPU).to(DEVICE)

    # Handle multi-GPU setup if applicable
    if (DEVICE.type == "cuda") and (NGPU > 1):
        netG = nn.DataParallel(netG, list(range(NGPU)))
        netD = nn.DataParallel(netD, list(range(NGPU)))

    # Fixed noise for generating sample outputs and tracking progress during training
    fixed_generator = torch.Generator(device=DEVICE).manual_seed(SEED_NUM)

    fixed_noise = torch.randn(64, INPUT_DIMENSION, 1, 1, device = DEVICE, generator = fixed_generator)

    # Optimizers for Generator and Discriminator
    optimizerD = optim.Adam(netD.parameters(), lr = LEARNING_RATE_D, betas = (BETA1, 0.999))
    optimizerG = optim.Adam(netG.parameters(), lr = LEARNING_RATE_G, betas = (BETA1, 0.999))

    # Schedulers for optimizers
    schedulerD = torch.optim.lr_scheduler.CosineAnnealingLR(optimizerD, T_max = EPOCHS, eta_min = 1e-6)
    schedulerG = torch.optim.lr_scheduler.CosineAnnealingLR(optimizerG, T_max = EPOCHS, eta_min = 1e-6)

    # Default starting epoch
    start_epoch = 0

    # Loads checkpoint dcgan if provided 

    # Case 1: train base dcgan from a checkpoint
    # Case 2: train per-class dcgan from scratch with base dcgan weights as a starting point
    # Case 3: train per-class dcgan from a checkpoint
    if checkpoint_path and os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path, map_location = DEVICE)
        netG.load_state_dict(checkpoint["netG"])
        netD.load_state_dict(checkpoint["netD"])

        # Loads checkpoint optimizers if resuming training
        if resume:
            optimizerG.load_state_dict(checkpoint["optimizerG"])
            optimizerD.load_state_dict(checkpoint["optimizerD"])
            start_epoch = checkpoint["epoch"] + 1

    # Fresh start
    else:
        netG.apply(initialize_weights)
        netD.apply(initialize_weights)

    # Loss function
    criterion = nn.BCEWithLogitsLoss()

    # Labels for real and fake images
    real_label = 0.9 # Slightly less than 1
    fake_label = 0.1 # Slightly more than 0

    # Actual training
    for epoch in range(start_epoch, EPOCHS):
        # Calculate noise magnitude decay
        noise_magnitude = 0.1 * 0.5 * (1 + cos(pi * epoch / EPOCHS)) # 0.1 = maximum noise magnitude
    
        for i, (real_images, _) in enumerate(dataloader): # Iterate through batches in the dataset
            # 1. Update Discriminator: 
            #    maximize log(D(x)) + log(1 - D(G(z)))

            # 1.A. Train Discriminator on real images
            netD.zero_grad()

            # Format real batch
            real_images = real_images.to(DEVICE)

            # Well, train the Discriminator on noisy real images
            noise = torch.randn_like(real_images) * noise_magnitude # noise magnitude decays

            noisy_real_images = real_images + noise

            size = real_images.size(0)

            label = torch.full((size,), real_label, dtype = torch.float, device = DEVICE)

            # Forward pass noisy real images through Discriminator
            output = netD(noisy_real_images).view(-1)

            # Calculate Discriminator loss for noisy real images
            errD_real = criterion(output, label)

            # Backpropagate error for noisy real images
            errD_real.backward()

            # Mean output for noisy real images
            D_x = output.mean().item()

            # 1.B. Train Discriminator on batch of all fake images

            # Generate batch of latent vectors
            generator = torch.Generator(device=DEVICE).manual_seed(SEED_NUM + epoch)

            noise = torch.randn(size, INPUT_DIMENSION, 1, 1, device = DEVICE, generator = generator)

            # Generate fake images with Generator
            fake = netG(noise)

            # Classify fake images with Discriminator
            label.fill_(fake_label)

            # Forward pass fake images through Discriminator
            output = netD(fake.detach()).view(-1)

            # Calculate Discriminator loss for fake images
            errD_fake = criterion(output, label)

            # Backpropagate error for fake images
            errD_fake.backward()

            # Clip Discriminator gradients for stability
            torch.nn.utils.clip_grad_norm_(netD.parameters(), max_norm = 1.0)

            # Mean output for fake images
            D_G_z1 = output.mean().item()

            # Compute total Discriminator error = real error + fake error
            errD = errD_real + errD_fake

            # Finally update Discriminator
            optimizerD.step()

            # 2. Update Generator: 
            #    maximize log(D(G(z)))

            netG.zero_grad()
            label.fill_(real_label)  # fake labels are real for Generator cost

            # Pass fake images through Discriminator
            output = netD(fake).view(-1)

            # Calculate Generator loss based on Discriminator's output
            errG = criterion(output, label)

            # Backpropagate error for Generator
            errG.backward()

            # Mean output for fake images after Generator update
            D_G_z2 = output.mean().item()

            # Finally update Generator
            optimizerG.step()

            # Debugging: Print losses and monitor training progress

            if i in [0, len(dataloader)//2]:
                print(
                  f"Epoch [{epoch}/{EPOCHS}] Batch {i}/{len(dataloader)} \
                    Loss D: {errD.item():.4f}, loss G: {errG.item():.4f} \
                    D(x): {D_x:.4f}, \
                    D(G(z))_real: {D_G_z1:.4f}, D(G(z))_fake: {D_G_z2:.4f}"
                )

        # Step learning rate schedulers
        schedulerD.step()
        schedulerG.step()

        if epoch % CHECKPOINT == 0:
            fake_images = netG(fixed_noise).detach()

            if target_class:
                path = f"{save_path}/{target_class}"
            
            else:
                # root folder of save path, else /base if base dcgan
                path = f"{save_path}{"" if balanced_path else "/base"}"

            # Save images Generator could produce during checkpoints
            save_image(
                fake_images,
                os.path.join(path, f"sample_epoch_{epoch}.png"),
                normalize = True
            )

            # Save model version
            save_dict = {
                "epoch": epoch,
                "netG": netG.state_dict(),
                "netD": netD.state_dict(),
                "optimizerG": optimizerG.state_dict(),
                "optimizerD": optimizerD.state_dict(),
            }

            torch.save(save_dict, os.path.join(path, f"checkpoint_epoch_{epoch}.pth"))

    torch.save({
        "epoch": EPOCHS,
        "netG": netG.state_dict(),
        "netD": netD.state_dict(),
        "optimizerG": optimizerG.state_dict(),
        "optimizerD": optimizerD.state_dict(),
    }, os.path.join(f"{save_path}", f"{target_class if target_class else ("final" if balanced_path else "base")}_dcgan_final.pth"))

    return netG, netD

### Part 3.1: Training a Base DCGAN

<p align="justify">We first train a base DCGAN on the full original dataset. This model captures general features across all classes and serves as a weight initialization checkpoint when training class-specific DCGANs to help improve convergence and stability.

In [None]:
# Train Base DCGAN

# trained_generator_base, trained_discriminator_base = train_dcgan(target_class = None, resume = False, checkpoint_path = None)

trained_generator_base, trained_discriminator_base = train_dcgan(target_class = None, resume = True, checkpoint_path = '../model2/gan_test/base_dcgan_final.pth')

### Part 3.2: Training a DCGAN Per Class

In [13]:
# Train DCGAN for "cordana" class
# trained_generator_cordana, _ = train_dcgan(target_class = "cordana")

trained_generator_cordana, _ = train_dcgan(target_class = "cordana", resume = True, checkpoint_path = "../model2/gan_test/cordana_dcgan_final.pth")

In [14]:
# Train DCGAN for "healthy" class
# trained_generator_healthy, _ = train_dcgan(target_class = "healthy")

trained_generator_healthy, _ = train_dcgan(target_class = "healthy", resume = True, checkpoint_path = '../model2/gan_test/healthy_dcgan_final.pth')

In [15]:
# Train DCGAN for "pestalotiopsis" class
# trained_generator_pestalotiopsis, _ = train_dcgan(target_class = "pestalotiopsis")

trained_generator_pestalotiopsis, _ = train_dcgan(target_class = "pestalotiopsis", resume = True, checkpoint_path = "../model2/gan_test/pestalotiopsis_dcgan_final.pth")

## Part 4: Generating Images for Each Underrepresented Class (Cordana, Healthy, Pestalotiopsis)

<p align="justify">To tackle class imbalance, separate DCGANs were previously trained for each underrepresented class: Cordana, Healthy, and Pestalotiopsis. These DCGANs generate synthetic images that augment the original dataset, equalizing the number of samples and hopefully helping improve classifier performance.

In [16]:
def generate_synthetic_images(dcgan_generator, amount_to_generate, class_label, output_directory):

    # Set the generator to evaluation mode to disable Dropout and InstanceNorm2d updates
    dcgan_generator.eval()

    # Construct the path to the class-specific output directory
    class_output_directory = os.path.join(output_directory, class_label)

    # Create the output directory if it does not exist just in case
    os.makedirs(class_output_directory, exist_ok = True)

    # Disable gradient computation for efficiency during inference
    with torch.no_grad():
        for i in range(0, amount_to_generate, 16): # Batches of 16
            batch_size = min(16, amount_to_generate - i) # Adjusts batch size if near the end of generation

            # Sample random noise vectors as generator input
            noise = torch.randn(batch_size, INPUT_DIMENSION, 1, 1, device = DEVICE)

            # Generate a batch of fake images from the noise
            fake = dcgan_generator(noise)

            # Save each generated image to the output directory
            for j in range(batch_size):
                save_image(
                    fake[j], # Single image tensor
                    os.path.join(class_output_directory, f"gen_{i + j}.png"),
                    normalize = True
                )

In [17]:
GAN_OUTPUT_DIRECTORY_BALANCED = "../model2/balanced"

# Target count based on the dominant Sigatoka class
TARGET_COUNT = 424

# Dictionary of class names and their real image counts
real_image_counts = {
    "cordana"        : 145,
    "healthy"        : 115,
    "pestalotiopsis" : 155,
}

# Dictionary mapping class labels to their corresponding trained generators
trained_generators = {
    "cordana"        : trained_generator_cordana,
    "healthy"        : trained_generator_healthy,
    "pestalotiopsis" : trained_generator_pestalotiopsis,
}

# Generate synthetic images for each underrepresented class
for label, real_count in real_image_counts.items():
    amount_to_generate = TARGET_COUNT - real_count

    generator = trained_generators[label]

    generate_synthetic_images(
        dcgan_generator = generator, amount_to_generate = amount_to_generate, class_label = label, output_directory = GAN_OUTPUT_DIRECTORY_BALANCED
    )

## Part 5: Training a Unified DCGAN for Feature Extraction

<p align="justify">A single DCGAN was trained on the balanced dataset combining real and synthetic images.

In [42]:
# train DCGAN on balanced training data

# balanced path is path to balanced training data
# save path of model is automatic to "../model2/final_dcgan"

_, trained_discriminator_final = train_dcgan(resume = True, checkpoint_path = "../model2/final_dcgan/final_dcgan_final.pth", balanced_path = GAN_OUTPUT_DIRECTORY_BALANCED)

## Part 6: Extract features from final DCGAN

<p align="justify">After training, the discriminator is frozen and repurposed as a fixed feature extractor.

<p align="justify">Mid-level feature maps were extracted from the frozen discriminator’s intermediate layers for each image. These feature representations serve as inputs to the CNN classifier used to enhance its performance.

In [None]:
def extract_features(discriminator = None, dataset = None):
    # Set the discriminator model to evaluation mode to disable dropout etc. etc.
    discriminator.eval()

    # Set random seed for reproducibility
    set_seed(SEED_NUM)

    # Create a DataLoader from the given dataset
    dataloader = DataLoader(dataset, batch_size = 32, shuffle = False, num_workers=4)

    # Lists to accumulate extracted features and labels
    all_features = []
    all_labels = []

    discriminator.eval() #

    with torch.no_grad():
        for images, labels in dataloader:
            # Move images to the same device as the model
            images = images.to(DEVICE)

            # Forward pass through discriminator to get intermediate features
            features = discriminator(images, return_features = True) 

            # Move features to CPU and accumulate
            all_features.append(features.cpu())
            all_labels.append(labels)

    # Concatenate all feature batches into a single tensor of shape (total_images, N)
    all_features = torch.cat(all_features) 

    # Concatenate all label batches into a single tensor of shape (total_images,)
    all_labels = torch.cat(all_labels) 

    return all_features, all_labels

In [None]:
def split_imagefolder_dataset(root_path, train_ratio = 0.8):
    # Load all images from the directory and apply transform
    full_dataset = datasets.ImageFolder(root = root_path, transform = transform_cnn)

    # Compute train/test split sizes
    train_size = int(train_ratio * len(full_dataset))

    test_size = len(full_dataset) - train_size

    # Seed for reproducibility
    generator = torch.Generator().manual_seed(SEED_NUM)

    # Randomly split dataset into train and test subsets
    train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size], generator = generator)

    return train_dataset, test_dataset

train_dataset, test_dataset = split_imagefolder_dataset("../model2/balanced")

In [63]:
train_features, train_labels = extract_features(discriminator = trained_discriminator_final, dataset = train_dataset)

## Part 7: Define CNN architecture and Train Final CNN Classifier

<p align="justify">With features extracted from the frozen discriminator, a CNN is designed to perform the final classification of banana leaf diseases. The network is trained on these extracted features to optimize classification accuracy, completing the hybrid model pipeline.


<p align="justify">This CNN takes feature maps extracted from the frozen discriminator as input and applies three convolutional layers with batch normalization and ReLU activations to learn higher-level representations. It then uses adaptive average pooling to reduce spatial dimensions, followed by dropout and a fully connected layer to output class scores.

In [64]:
class CNNClassifier(nn.Module):
    def __init__(self, discriminator = None, input_channels=1024, num_classes=len(BANANA_CLASSES)):
        super().__init__()
        
        self.discriminator = discriminator
        # self.discriminator.eval() 

        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),

            nn.Conv2d(512, 256, kernel_size=3, stride=1, padding=1), 
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.AdaptiveAvgPool2d((1, 1)) 
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),  
            nn.Dropout(0.4),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = self.classifier(x)
        return x

<p align="justify"><code>cnn_classifier</code> trains the CNN using extracted features and labels wrapped in a DataLoader for batch processing. It supports loading from a checkpoint to resume training and saves intermediate checkpoints periodically to enable recovery. The model is optimized using cross-entropy loss and Adam optimizer over multiple epochs, with progress printed after each epoch, and the final trained model is saved before returning.

In [None]:
def cnn_classifier(batch_size = 32, workers = 4, epochs = 25, features = None, labels = None, checkpoint_path = None, discriminator = None):
    # Combine features and labels into a PyTorch dataset
    feature_dataset = TensorDataset(features, labels)

    feature_loader = DataLoader(feature_dataset, batch_size = batch_size, shuffle = True, num_workers = workers)

    # Initialize CNN classifier
    model = CNNClassifier(discriminator).to(DEVICE)

    # Loss function
    criterion = nn.CrossEntropyLoss()

    # Optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE_CNN)

    # Default starting epoch
    start_epoch = 0

    if checkpoint_path and os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path, map_location = DEVICE)

        model.load_state_dict(checkpoint["model_state"])
        optimizer.load_state_dict(checkpoint["optimizer_state"])

        start_epoch = checkpoint["epoch"] + 1

    # TO ADD: setting up of save folder!
        
    model.train()
    for epoch in range(start_epoch, epochs):
        for batch_x, batch_y in feature_loader:
            # Move input features and labels to the computing device
            batch_x, batch_y = batch_x.to(DEVICE), batch_y.to(DEVICE)

            # Zero the gradients from the previous step
            optimizer.zero_grad()

            # Forward pass to compute predictions
            outputs = model(batch_x)

            # Compute loss between predicted and actual labels
            loss = criterion(outputs, batch_y)

            # Backward pass to compute gradients
            loss.backward()

            # Update model weights
            optimizer.step()

        # Debugging: Print losses
        print(f"Epoch [{epoch + 1}/{epochs}]: Loss = {loss.item():.4f}")

        if epoch % CHECKPOINT == 0:
            checkpoint_path = os.path.join("../model2/cnn_classifier", f"cnn_classifier_epoch_{epoch}.pth")
            torch.save(model.state_dict(), checkpoint_path)
        

    checkpoint_path = os.path.join("../model2/cnn_classifier", f"cnn_classifier_final.pth")

    torch.save({
        "model_state": model.state_dict(),
        "discriminator_state": discriminator.state_dict(),
        "optimizer_state": optimizer.state_dict(),
        "epoch": epoch,                            
    }, checkpoint_path)
    
    return model

In [66]:
cnn_model = cnn_classifier(epochs = 30, features = train_features, labels = train_labels, discriminator = trained_discriminator_final)

Epoch [1/30]: Loss = 0.4857
Epoch [2/30]: Loss = 0.6707
Epoch [3/30]: Loss = 0.2512
Epoch [4/30]: Loss = 0.5004
Epoch [5/30]: Loss = 0.2500
Epoch [6/30]: Loss = 0.1754
Epoch [7/30]: Loss = 0.2775
Epoch [8/30]: Loss = 0.2244
Epoch [9/30]: Loss = 0.1882
Epoch [10/30]: Loss = 0.3450
Epoch [11/30]: Loss = 0.1142
Epoch [12/30]: Loss = 0.1611
Epoch [13/30]: Loss = 0.1532
Epoch [14/30]: Loss = 0.1197
Epoch [15/30]: Loss = 0.0710
Epoch [16/30]: Loss = 0.1028
Epoch [17/30]: Loss = 0.1232
Epoch [18/30]: Loss = 0.0974
Epoch [19/30]: Loss = 0.0533
Epoch [20/30]: Loss = 0.0548
Epoch [21/30]: Loss = 0.0244
Epoch [22/30]: Loss = 0.0820
Epoch [23/30]: Loss = 0.0420
Epoch [24/30]: Loss = 0.0627
Epoch [25/30]: Loss = 0.0239
Epoch [26/30]: Loss = 0.0381
Epoch [27/30]: Loss = 0.1339
Epoch [28/30]: Loss = 0.0154
Epoch [29/30]: Loss = 0.0225
Epoch [30/30]: Loss = 0.0968


# Part 8: Evaluation

<p align="justify">After training, the CNN classifier is evaluated on unseen test images to assess its ability to correctly identify different banana leaf diseases. Performance metrics such as accuracy, precision, recall, and F1-score are calculated to provide a comprehensive view of how well the model distinguishes between each banana leaf class.

In [None]:
test_features, test_labels = extract_features(discriminator = trained_discriminator_final, dataset = test_dataset)

In [None]:
test_dataset = TensorDataset(test_features, test_labels)

test_loader = DataLoader(
    test_dataset,
    batch_size = 32,
    shuffle = False,
    num_workers = 4
)

In [None]:
def evaluate_model(model, dataloader, class_names, device = DEVICE):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            all_preds.extend(predicted.cpu().tolist())
            all_labels.extend(labels.cpu().tolist())

    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)

    accuracy = accuracy_score(all_labels, all_preds)
    print(f"Test Accuracy: {accuracy * 100:.2f}%\n")

    print("Classification Report:\n")
    print(classification_report(all_labels, all_preds, target_names = class_names))

    print("Confusion Matrix:\n")
    print(confusion_matrix(all_labels, all_preds))

evaluate_model(model = cnn_model, dataloader = test_loader, class_names = BANANA_CLASSES)

# Model 2 References:

1. https://docs.pytorch.org/tutorials/beginner/dcgan_faces_tutorial.html

2. https://pyimagesearch.com/2021/10/25/training-a-dcgan-in-pytorch/

3. https://medium.com/@manoharmanok/implementing-dcgan-in-pytorch-using-the-celeba-dataset-a-comprehensive-guide-660e6e8e29d2