In [None]:
import os
import random
from PIL import Image
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader
from torchvision import models
import torchvision.transforms.functional as TF

# 1a) Choose device: GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


In [None]:
root = "/home/stud1/Desktop/PIL_MAIN/Leaf Dataset"
csv_file = os.path.join(root, "labels.csv")

# UV modality directories
white_uv_dir = os.path.join(root, "WhiteUV")
uv365_dir    = os.path.join(root, "365UV")
uv395_dir    = os.path.join(root, "395UV")

# NoUV (RGB) modality directories
white_nouv_dir = os.path.join(root, "WhiteNoUV")
nouv365_dir    = os.path.join(root, "365NoUV")
nouv395_dir    = os.path.join(root, "395NoUV")


In [None]:
# # Read CSV into a DataFrame
# df_labels = pd.read_csv(csv_file)

# # Create a dictionary: filename → label (int)
# label_dict = dict(zip(df_labels["filename"], df_labels["label"]))



# ...existing code...
df_labels = pd.read_csv(csv_file)

# Ensure filenames are strings and have .jpg extension
df_labels["filename"] = df_labels["filename"].astype(str)
if not df_labels["filename"].iloc[0].endswith(".jpg"):
    df_labels["filename"] = df_labels["filename"] + ".jpg"

label_dict = dict(zip(df_labels["filename"], df_labels["label"]))
# ...existing code...

In [None]:
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms.functional as TF
import torch
import os
import random

class MultiModalLeafDataset(Dataset):
    def __init__(self, filenames, labels, modality="uv", img_size=(224, 224), augment=False):
        """
        Args:
            filenames (list[str]): List of image filenames (e.g. ["0.jpg", "1.jpg", ...]).
            labels (dict): Mapping from filename → integer label (0 or 1).
            modality (str): "uv", "rgb", or "uv_rgb".
            img_size (tuple[int,int]): (height, width) for resizing (default (224,224)).
            augment (bool): If True, apply random flips/rotations; else no augmentation.
        """
        super().__init__()
        self.filenames = filenames
        self.labels = labels
        self.modality = modality.lower()
        self.img_size = img_size
        self.augment = augment

        # Define your directories for UV and RGB modalities (these must be set)
        self.uv_dirs = [white_uv_dir, uv365_dir, uv395_dir]
        self.rgb_dirs = [white_nouv_dir, nouv365_dir, nouv395_dir]

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        # 1) Get filename and label
        fname = self.filenames[idx]
        label = self.labels[fname]  # 0 or 1

        # Helper: load a single RGB image, resize, convert → tensor (shape [3, H, W])
        def load_rgb_image(path):
            img = Image.open(path).convert("RGB")  # Convert to RGB
            img = img.resize(self.img_size, resample=Image.BILINEAR)
            tensor = TF.to_tensor(img)  # shape [3, H, W]
            return tensor

        # 2) For UV → load 3 RGB images and stack → [9, H, W]
        if self.modality in ("uv", "uv_rgb"):
            uv_tensors = []
            for d in self.uv_dirs:
                full_path = os.path.join(d, fname)
                if not os.path.isfile(full_path):
                    raise FileNotFoundError(f"Expected UV file not found: {full_path}")
                uv_tensors.append(load_rgb_image(full_path))  # [3, H, W] each
            uv_tensor = torch.cat(uv_tensors, dim=0)  # [9, H, W]

        # 3) For RGB (NoUV) → load 3 RGB images and stack → [9, H, W]
        if self.modality in ("rgb", "uv_rgb"):
            rgb_tensors = []
            for d in self.rgb_dirs:
                full_path = os.path.join(d, fname)
                if not os.path.isfile(full_path):
                    raise FileNotFoundError(f"Expected RGB file not found: {full_path}")
                rgb_tensors.append(load_rgb_image(full_path))  # [3, H, W] each
            rgb_tensor = torch.cat(rgb_tensors, dim=0)  # [9, H, W]

        # 4) Combine according to modality
        if self.modality == "uv":
            img_tensor = uv_tensor                 # [9, H, W]
        elif self.modality == "rgb":
            img_tensor = rgb_tensor                # [9, H, W]
        elif self.modality == "uv_rgb":
            img_tensor = torch.cat([uv_tensor, rgb_tensor], dim=0)  # [18, H, W]
        else:
            raise ValueError(f"Modality must be 'uv', 'rgb', or 'uv_rgb', got '{self.modality}'")

        # 5) Apply identical augmentation (flips/rotations)
        if self.augment:
            # Random horizontal flip (50%)
            if random.random() > 0.5:
                img_tensor = torch.flip(img_tensor, dims=[2])  # flip width
            # Random vertical flip (50%)
            if random.random() > 0.5:
                img_tensor = torch.flip(img_tensor, dims=[1])  # flip height
            # Random rotation (0, 90, 180, 270 degrees)
            angle = random.choice([0, 90, 180, 270])
            if angle != 0:
                img_tensor = TF.rotate(img_tensor, angle)

        return img_tensor, torch.tensor(label, dtype=torch.long)


In [None]:
all_filenames = list(label_dict.keys())
random.shuffle(all_filenames)

n_total = len(all_filenames)
n_train = int(0.70 * n_total)
n_val   = int(0.10 * n_total)
n_test  = n_total - n_train - n_val

train_fnames = all_filenames[:n_train]
val_fnames   = all_filenames[n_train : n_train + n_val]
test_fnames  = all_filenames[n_train + n_val : ]

print(f"Total samples: {n_total}")
print(f" → Train: {len(train_fnames)}, Val: {len(val_fnames)}, Test: {len(test_fnames)}")


In [None]:
batch_size = 16
num_workers = 4  # or 0 if working on Windows / Jupyter without multiprocessing

# 6a) UV‐only
train_uv_dataset = MultiModalLeafDataset(train_fnames, label_dict, modality="uv",   img_size=(224,224), augment=True)
val_uv_dataset   = MultiModalLeafDataset(val_fnames,   label_dict, modality="uv",   img_size=(224,224), augment=False)
test_uv_dataset  = MultiModalLeafDataset(test_fnames,  label_dict, modality="uv",   img_size=(224,224), augment=False)

train_uv_loader = DataLoader(train_uv_dataset, batch_size=batch_size, shuffle=True,  num_workers=num_workers)
val_uv_loader   = DataLoader(val_uv_dataset,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_uv_loader  = DataLoader(test_uv_dataset,  batch_size=batch_size, shuffle=False, num_workers=num_workers)

# 6b) RGB‐only (NoUV)
train_rgb_dataset = MultiModalLeafDataset(train_fnames, label_dict, modality="rgb",  img_size=(224,224), augment=True)
val_rgb_dataset   = MultiModalLeafDataset(val_fnames,   label_dict, modality="rgb",  img_size=(224,224), augment=False)
test_rgb_dataset  = MultiModalLeafDataset(test_fnames,  label_dict, modality="rgb",  img_size=(224,224), augment=False)

train_rgb_loader = DataLoader(train_rgb_dataset, batch_size=batch_size, shuffle=True,  num_workers=num_workers)
val_rgb_loader   = DataLoader(val_rgb_dataset,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_rgb_loader  = DataLoader(test_rgb_dataset,  batch_size=batch_size, shuffle=False, num_workers=num_workers)

# 6c) UV+RGB early fusion
train_uvrgb_dataset = MultiModalLeafDataset(train_fnames, label_dict, modality="uv_rgb", img_size=(224,224), augment=True)
val_uvrgb_dataset   = MultiModalLeafDataset(val_fnames,   label_dict, modality="uv_rgb", img_size=(224,224), augment=False)
test_uvrgb_dataset  = MultiModalLeafDataset(test_fnames,  label_dict, modality="uv_rgb", img_size=(224,224), augment=False)

train_uvrgb_loader = DataLoader(train_uvrgb_dataset, batch_size=batch_size, shuffle=True,  num_workers=num_workers)
val_uvrgb_loader   = DataLoader(val_uvrgb_dataset,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
test_uvrgb_loader  = DataLoader(test_uvrgb_dataset,  batch_size=batch_size, shuffle=False, num_workers=num_workers)


In [None]:
# ...existing code...
from torchvision.models import VGG16_Weights




In [None]:
def create_vgg16_model(input_channels=3, num_classes=2, pretrained=True):
    """
    Returns a VGG16-based model on `device`:
      - If input_channels != 3, replaces first conv to accept `input_channels`.
      - Adjusts final classifier to output `num_classes`.
      - Uses pretrained ImageNet weights for everything else.
    """
    # 1) Load standard pretrained VGG16
    # model = models.vgg16(pretrained=pretrained)
    model = models.vgg16(weights=VGG16_Weights.IMAGENET1K_V1)
    # ...existing code...

    # 2) If we need a custom number of input channels, modify the first conv:
    if input_channels != 3:
        old_conv = model.features[0]  # original: Conv2d(3 → 64, kernel_size=3, padding=1)
        new_conv = nn.Conv2d(
            in_channels=input_channels,
            out_channels=old_conv.out_channels,
            kernel_size=old_conv.kernel_size,
            stride=old_conv.stride,
            padding=old_conv.padding,
            bias=(old_conv.bias is not None),
        )
        # Initialize new_conv weights by copying from old_conv
        with torch.no_grad():
            # Copy the first 3 channels from the pretrained weights
            new_conv.weight[:, :3, :, :] = old_conv.weight
            # For any extra channel (4..input_channels-1), we can copy the first channel’s weights:
            for i in range(3, input_channels):
                # Copy channel 0 of old_conv into channel i
                new_conv.weight[:, i : i + 1, :, :] = old_conv.weight[:, :1, :, :]
            # Copy bias if present
            if old_conv.bias is not None:
                new_conv.bias[:] = old_conv.bias[:]

        # Replace the first conv layer in features
        model.features[0] = new_conv

    # 3) Replace the final classifier to output `num_classes` instead of 1000
    #    VGG16’s default classifier[-1] is Linear(4096 → 1000). We need Linear(4096 → num_classes).
    model.classifier[6] = nn.Linear(in_features=4096, out_features=num_classes)

    return model.to(device)


In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    running_corrects = 0

    for inputs, labels in dataloader:
        inputs = inputs.to(device)   # shape: [B, C, 224, 224]
        labels = labels.to(device)   # shape: [B]

        optimizer.zero_grad()
        outputs = model(inputs)      # shape: [B, 2]
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        preds = torch.argmax(outputs, dim=1)
        running_corrects += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc  = running_corrects.double() / len(dataloader.dataset)
    return epoch_loss, epoch_acc.item()


def validate_one_epoch(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            preds = torch.argmax(outputs, dim=1)
            running_corrects += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc  = running_corrects.double() / len(dataloader.dataset)
    return epoch_loss, epoch_acc.item()


In [None]:
def train_model(modality, train_loader, val_loader, num_epochs=20, lr=1e-4, patience=5):
    """
    Trains a VGG16-based model for the given modality with early stopping.
    Returns: (best_model, best_validation_accuracy)
    """
    if modality in ("uv", "rgb"):
        in_channels = 9
    elif modality == "uv_rgb":
        in_channels = 18
    else:
        raise ValueError("Modality must be 'uv', 'rgb', or 'uv_rgb'.")

    model = create_vgg16_model(input_channels=in_channels, num_classes=2, pretrained=True)
    # Unfreeze last two convolutional blocks for fine-tuning
    for param in model.features[24:].parameters():
       param.requires_grad = True
       
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    best_val_acc = 0.0
    best_model_wts = None
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer)
        val_loss, val_acc     = validate_one_epoch(model, val_loader, criterion)
        scheduler.step()

        print(f"[{modality.upper()}] Epoch {epoch+1}/{num_epochs}  "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}  "
              f"|  Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        # Save best validation weights

        # Early stopping logic
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_wts = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                break

    model.load_state_dict(best_model_wts)
    return model, best_val_acc

In [None]:
# 10a) UV‐only
print("→ Training UV‐only model …")
uv_model, uv_best_acc = train_model(
    modality="uv",
    train_loader=train_uv_loader,
    val_loader=val_uv_loader,
    num_epochs=20,
    lr=1e-4
)
print(f"★ Best UV Validation Accuracy: {uv_best_acc:.4f}\n")

# 10b) RGB‐only
print("→ Training RGB‐only model …")
rgb_model, rgb_best_acc = train_model(
    modality="rgb",
    train_loader=train_rgb_loader,
    val_loader=val_rgb_loader,
    num_epochs=20,
    lr=1e-4
)
print(f"★ Best RGB Validation Accuracy: {rgb_best_acc:.4f}\n")




# 10c) UV+RGB (6-channel early fusion)
print("→ Training UV+RGB model …")
uvrgb_model, uvrgb_best_acc = train_model(
    modality="uv_rgb",
    train_loader=train_uvrgb_loader,
    val_loader=val_uvrgb_loader,
    num_epochs=20,
    lr=1e-4
)
print(f"★ Best UV+RGB Validation Accuracy: {uvrgb_best_acc:.4f}\n")


In [None]:
def test_model(model, test_loader):
    model.eval()
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1)
            running_corrects += torch.sum(preds == labels.data)

    test_acc = running_corrects.double() / len(test_loader.dataset)
    return test_acc.item()

uv_test_acc    = test_model(uv_model, test_uv_loader)
rgb_test_acc   = test_model(rgb_model, test_rgb_loader)
# uvrgb_test_acc = test_model(uvrgb_model, test_uvrgb_loader)

print(f"→ UV Test Accuracy:    {uv_test_acc:.4f}")
print(f"→ RGB Test Accuracy:   {rgb_test_acc:.4f}")
# print(f"→ UV+RGB Test Accuracy: {uvrgb_test_acc:.4f}")


In [None]:
from sklearn.metrics import precision_score, recall_score, confusion_matrix

def eval_metrics(model, test_loader, name="Model"):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.numpy())
    precision = precision_score(all_labels, all_preds, average='binary')
    recall = recall_score(all_labels, all_preds, average='binary')
    cm = confusion_matrix(all_labels, all_preds)
    print(f"\n{name} Precision: {precision:.4f}")
    print(f"{name} Recall:    {recall:.4f}")
    print(f"{name} Confusion Matrix:\n{cm}")

eval_metrics(uv_model, test_uv_loader, name="UV")
eval_metrics(rgb_model, test_rgb_loader, name="RGB")
eval_metrics(uvrgb_model, test_uvrgb_loader, name="UV+RGB")