In [None]:
import torch
import torch.nn as nn
from torchvision import models

def build_resnet18(
    num_classes: int,
    pretrained: bool = True,
    dropout: float = 0.0,
    freeze_backbone: bool = False,
    in_channels: int = 3,
) -> nn.Module:

    try:
        weights = models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
        model = models.resnet18(weights=weights)
    except Exception:
        model = models.resnet18(pretrained=pretrained)

    if in_channels != 3:
        model.conv1 = nn.Conv2d(in_channels, model.conv1.out_channels,
                                kernel_size=model.conv1.kernel_size,
                                stride=model.conv1.stride,
                                padding=model.conv1.padding,
                                bias=False)

    if freeze_backbone:
        for name, p in model.named_parameters():
            if not name.startswith("fc."):
                p.requires_grad = False

    in_feats = model.fc.in_features
    if dropout and dropout > 0.0:
        model.fc = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(in_feats, num_classes),
        )
    else:
        model.fc = nn.Linear(in_feats, num_classes)

    head = model.fc[-1] if isinstance(model.fc, nn.Sequential) else model.fc
    nn.init.kaiming_uniform_(head.weight, nonlinearity='relu')
    nn.init.zeros_(head.bias)

    return model


def save_checkpoint(model: nn.Module, path: str):
    torch.save(model.state_dict(), path)


def load_checkpoint(model: nn.Module, path: str, map_location='cpu', strict=True, expected_in_channels: int = 3):
    state = torch.load(path, map_location=map_location)

    model_conv1_in_channels = model.conv1.in_channels

    if 'conv1.weight' in state:
        checkpoint_conv1_in_channels = state['conv1.weight'].shape[1]

        if model_conv1_in_channels != checkpoint_conv1_in_channels:
            print(f"Warning: Model expects {model_conv1_in_channels} channels for conv1, but checkpoint has {checkpoint_conv1_in_channels}. Skipping loading conv1.weight.")
            del state['conv1.weight']
            strict = False

    model.load_state_dict(state, strict=strict)
    return model

In [None]:
import torch
import numpy as np
import cv2
from PIL import Image
from torchvision import transforms

def homomorphic_filter_rgb(img_pil, sigma=30.0, gamma_l=0.7, gamma_h=1.5, eps=1e-6):
    img = np.array(img_pil.convert("RGB"), dtype=np.uint8)
    ycrcb = cv2.cvtColor(img, cv2.COLOR_RGB2YCrCb).astype(np.float32)
    luminance = ycrcb[..., 0] / 255.0
    log_y = np.log1p(luminance + eps)
    freq = np.fft.fftshift(np.fft.fft2(log_y))
    rows, cols = luminance.shape
    u = np.arange(rows) - rows / 2.0
    v = np.arange(cols) - cols / 2.0
    vv, uu = np.meshgrid(v, u)
    distance = np.sqrt(uu ** 2 + vv ** 2)
    high_pass = (gamma_h - gamma_l) * (1.0 - np.exp(-(distance ** 2) / (2.0 * (sigma ** 2)))) + gamma_l
    filtered = np.real(np.fft.ifft2(np.fft.ifftshift(freq * high_pass)))
    exp_y = np.expm1(filtered)
    exp_y = np.clip(exp_y, 0.0, None)
    normalized = cv2.normalize(exp_y.astype(np.float32), None, 0.0, 1.0, cv2.NORM_MINMAX)
    ycrcb[..., 0] = np.clip(normalized * 255.0, 0.0, 255.0)
    out = cv2.cvtColor(ycrcb.astype(np.uint8), cv2.COLOR_YCrCb2RGB)
    return Image.fromarray(out)

def logarithmic_enhancement_rgb(img_pil, c=1.0):
    img = np.array(img_pil.convert("RGB"), dtype=np.float32) / 255.0
    enhanced = c * np.log1p(img) / np.log1p(1.0)
    enhanced = np.clip(enhanced, 0.0, 1.0)
    enhanced = (enhanced * 255.0).astype(np.uint8)
    return Image.fromarray(enhanced)

def extract_frequency_features(img_pil):
    img = np.array(img_pil.convert("L"), dtype=np.float32) # Convert to grayscale
    f = np.fft.fft2(img)
    fshift = np.fft.fftshift(f)
    magnitude_spectrum = 20 * np.log(np.abs(fshift) + 1) # Logarithmic magnitude spectrum
    return magnitude_spectrum


class CombinedEnhancementAndFreq(object):
    """Custom transform to apply enhancement and add frequency features as a channel."""
    def __init__(self, homo_params=None, log_params=None):
        self.homo_params = homo_params or {}
        self.log_params = log_params or {}

    def __call__(self, img_pil):
        homo_img_pil = homomorphic_filter_rgb(img_pil, **self.homo_params)

        log_img_pil = logarithmic_enhancement_rgb(homo_img_pil, **self.log_params)

        transform_to_tensor = transforms.ToTensor()
        rgb_tensor = transform_to_tensor(log_img_pil) # (3, H, W)

        freq_magnitude = extract_frequency_features(img_pil)

        freq_magnitude_norm = (freq_magnitude - freq_magnitude.min()) / (freq_magnitude.max() - freq_magnitude.min() + 1e-6)
        freq_pil = Image.fromarray((freq_magnitude_norm * 255).astype(np.uint8))

        freq_pil = transforms.Resize(img_pil.size)(freq_pil)
        freq_tensor = transform_to_tensor(freq_pil).squeeze(0) # (1, H, W) after squeeze(0)
        if freq_tensor.dim() == 2:
            freq_tensor = freq_tensor.unsqueeze(0) # Make it (1, H, W)

        if rgb_tensor.shape[1:] != freq_tensor.shape[1:]:
            raise ValueError(f"Shape mismatch in CombinedEnhancementAndFreq: RGB {rgb_tensor.shape[1:]} vs Freq {freq_tensor.shape[1:]}")

        combined_tensor = torch.cat((rgb_tensor, freq_tensor), dim=0) # (4, H, W)
        return combined_tensor


def build_eval_transform(
    img_size,
    enhancement="homomorphic",
    homo_params=None,
    log_params=None,
    in_channels: int = 3,
):
    homo_params = homo_params or {}
    log_params = log_params or {}
    steps = [transforms.Resize((img_size, img_size))]
    if enhancement == "homomorphic":
        steps.append(transforms.Lambda(lambda im: homomorphic_filter_rgb(im, **homo_params)))
        steps.append(transforms.ToTensor())
    elif enhancement == "log":
        steps.append(transforms.Lambda(lambda im: logarithmic_enhancement_rgb(im, **log_params)))
        steps.append(transforms.ToTensor())
    elif enhancement == "homo+log":
        steps.append(transforms.Lambda(lambda im: homomorphic_filter_rgb(im, **homo_params)))
        steps.append(transforms.Lambda(lambda im: logarithmic_enhancement_rgb(im, **log_params)))
        steps.append(transforms.ToTensor())
    elif enhancement == "combined_features":
        steps.append(CombinedEnhancementAndFreq(homo_params=homo_params, log_params=log_params))
    elif enhancement == "none":
        steps.append(transforms.ToTensor())
    return transforms.Compose(steps)

In [None]:
from PIL import Image
import numpy as np
import os

# sample image
image_path = "/content/drive/MyDrive/Colab Notebooks/data/ECE253/82e90dbb-4b07-4f1f-b9d6-dfaead6cc602.png"

img_pil = Image.open(image_path).convert("RGB")

freq_magnitude = extract_frequency_features(img_pil)

freq_magnitude_norm = (freq_magnitude - freq_magnitude.min()) / (freq_magnitude.max() - freq_magnitude.min() + 1e-6)
freq_pil = Image.fromarray((freq_magnitude_norm * 255).astype(np.uint8))

directory, filename = os.path.split(image_path)
name, ext = os.path.splitext(filename)
output_path = os.path.join(directory, f"{name}_freq{ext}")

freq_pil.save(output_path)


In [None]:
training_config = {
    "train_dir": "/content/drive/MyDrive/Colab Notebooks/data/ECE253/lowlight/train",
    "val_dir": "/content/drive/MyDrive/Colab Notebooks/data/ECE253/lowlight/validation",
    "test_dir": "/content/drive/MyDrive/Colab Notebooks/data/ECE253/lowlight/test",
    "img_size": 64,
    "batch_size": 128,
    "epochs": 6,
    "lr": 3e-4,
    "weight_decay": 1e-4,
    "seed": 42,
    "out_model": "/content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth",
    "out_report": "/content/drive/MyDrive/Colab Notebooks/data/ECE253/test_results_combined_features.txt",
    "keep_classes": "beach,buildings,forest,harbor,freeway",
    "enhancement": "combined_features",
    "homo_params": {"sigma": 30.0, "gamma_l": 0.7, "gamma_h": 1.5},
    "log_params": {"c": 1.0},
    "in_channels": 4
}
print("Defined training_config dictionary.")

Defined training_config dictionary.


In [None]:
import os
import random
from pathlib import Path
import sys
import contextlib

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.cuda import amp

from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from tqdm import tqdm
import numpy as np

from PIL import Image

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def subset_imagefolder(dataset, keep_class_names):

    orig_classes = list(dataset.classes)
    new_classes = list(keep_class_names)
    new_class_to_idx = {c: i for i, c in enumerate(new_classes)}

    new_samples = []
    new_targets = []

    for path, old_label in dataset.samples:
        cls_name = orig_classes[old_label]
        if cls_name in new_class_to_idx:
            new_label = new_class_to_idx[cls_name]
            new_samples.append((path, new_label))
            new_targets.append(new_label)

    dataset.samples = new_samples
    dataset.targets = new_targets
    dataset.classes = new_classes
    dataset.class_to_idx = new_class_to_idx

    return dataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

set_seed(training_config["seed"])
IMG_SIZE = training_config["img_size"]

tf_train = build_eval_transform(
    img_size=IMG_SIZE,
    enhancement=training_config["enhancement"],
    homo_params=training_config.get("homo_params"),
    log_params=training_config.get("log_params"),
    in_channels=training_config["in_channels"],
)
tf_eval = build_eval_transform(
    img_size=IMG_SIZE,
    enhancement=training_config["enhancement"],
    homo_params=training_config.get("homo_params"),
    log_params=training_config.get("log_params"),
    in_channels=training_config["in_channels"],
)

train_ds = datasets.ImageFolder(training_config["train_dir"], transform=tf_train)
val_ds   = datasets.ImageFolder(training_config["val_dir"],   transform=tf_eval)
test_ds  = datasets.ImageFolder(training_config["test_dir"],  transform=tf_eval)

keep_cls_list = [c.strip() for c in training_config["keep_classes"].split(",") if c.strip()]

train_ds = subset_imagefolder(train_ds, keep_cls_list)
val_ds   = subset_imagefolder(val_ds,   keep_cls_list)
test_ds  = subset_imagefolder(test_ds,  keep_cls_list)

num_classes = len(train_ds.classes)

print(f"Number of classes: {num_classes}")
print(f"Training samples: {len(train_ds.samples)}")
print(f"Validation samples: {len(val_ds.samples)}")
print(f"Test samples: {len(test_ds.samples)}")

train_loader = DataLoader(
    train_ds,
    batch_size=training_config["batch_size"],
    shuffle=True,
    num_workers=0,
    pin_memory=False,
)
val_loader = DataLoader(
    val_ds,
    batch_size=training_config["batch_size"],
    shuffle=False,
    num_workers=0,
    pin_memory=False,
)
test_loader = DataLoader(
    test_ds,
    batch_size=training_config["batch_size"],
    shuffle=False,
    num_workers=0,
    pin_memory=False,
)

model = build_resnet18(
    num_classes=num_classes,
    pretrained=True,
    dropout=0.0,
    freeze_backbone=False,
    in_channels=training_config["in_channels"],
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=training_config["lr"], weight_decay=training_config["weight_decay"])
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

EPOCHS = training_config["epochs"]
best_val_acc = 0.0
epochs_no_improve = 0
patience = 10

use_amp = True if torch.cuda.is_available() else False
scaler = amp.GradScaler() if use_amp else None

print(f"\nStarting training for {EPOCHS} epochs...")
for epoch in range(1, EPOCHS + 1):
    model.train()
    loss_sum = 0.0
    y_true_train, y_pred_train = [], []

    for imgs, labels in tqdm(train_loader, desc=f"Epoch {epoch}/{EPOCHS} [train]"):
        imgs = imgs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        with amp.autocast() if use_amp else contextlib.nullcontext():
            logits = model(imgs)
            loss = criterion(logits, labels)

        if use_amp and scaler is not None:
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            optimizer.step()

        loss_sum += loss.item()
        y_pred_train += logits.argmax(1).detach().cpu().tolist()
        y_true_train += labels.detach().cpu().tolist()

    train_loss = loss_sum / len(train_loader)
    train_acc = accuracy_score(y_true_train, y_pred_train)

    model.eval()
    val_true, val_pred = [], []
    with torch.no_grad():
        for imgs, labels in tqdm(val_loader, desc=f"Epoch {epoch}/{EPOCHS} [val]"):
            imgs = imgs.to(device, non_blocking=True)
            with amp.autocast() if use_amp else contextlib.nullcontext():
                logits = model(imgs)
            preds = logits.argmax(1)

            val_pred += preds.cpu().tolist()
            val_true += labels.cpu().tolist()

    val_acc = accuracy_score(val_true, val_pred)
    scheduler.step(val_acc)

    print(f"Epoch {epoch}: train_loss={train_loss:.4f}  " \
          f"train_acc={train_acc:.4f}  val_acc={val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        save_checkpoint(model, training_config["out_model"])
        epochs_no_improve = 0
        print(f"  -> New best model saved to: {training_config['out_model']} (val_acc={best_val_acc:.4f})")
    else:
        epochs_no_improve += 1
        print(f"  -> Val acc did not improve. Patience: {epochs_no_improve}/{patience}")
        if epochs_no_improve > patience:
            print(f"  -> Early stopping triggered after {patience} epochs without improvement.")
            break

print("\nLoading best model for final evaluation...")
best_model = build_resnet18(
    num_classes=num_classes,
    pretrained=False,
    dropout=0.0,
    freeze_backbone=False,
    in_channels=training_config["in_channels"],
).to(device)

load_checkpoint(best_model, training_config["out_model"], map_location=device, expected_in_channels=training_config["in_channels"])
best_model.eval()

y_true, y_pred, y_prob_rows = [], [], []

print("\nEvaluating on validation set...")
with torch.no_grad():
    for imgs, labels in tqdm(val_loader, desc="Validating"):
        imgs = imgs.to(device, non_blocking=True)
        with amp.autocast() if use_amp else contextlib.nullcontext():
            logits = best_model(imgs)
        probs = torch.softmax(logits, dim=1).cpu().numpy()
        preds = logits.argmax(1).cpu().tolist()

        y_true += labels.tolist()
        y_pred += preds
        y_prob_rows += probs.tolist()

y_prob = np.array(y_prob_rows)

row_sums = y_prob.sum(axis=1, keepdims=True)
y_prob = np.divide(y_prob, row_sums, out=np.zeros_like(y_prob), where=row_sums != 0)


acc = accuracy_score(y_true, y_pred)
f1  = f1_score(y_true, y_pred, average='macro')
auc = roc_auc_score(y_true, y_prob, multi_class='ovr')

print(f"\n===== FINAL  RESULTS ====")
print(f"  Acc      = {acc:.4f}")
print(f"  F1(macro)= {f1:.4f}")
print(f"  AUC(OVR) = {auc:.4f}")

with open(training_config["out_report"], "w") as f:
    f.write(" results (best model based on val_acc) for combined features\n")
    f.write(f"Accuracy      : {acc:.6f}\n")
    f.write(f"F1 (macro)    : {f1:.6f}\n")
    f.write(f"AUC (OVR)     : {auc:.6f}\n")
    f.write(f"Best val_acc  : {best_val_acc:.6f}\n")
    f.write(f"Num classes   : {num_classes}\n")
    f.write(f"Used classes  : {train_ds.classes}\n")
    f.write(f"Train dir     : {training_config['train_dir']}\n")
    f.write(f"Val dir       : {training_config['val_dir']}\n")
    f.write(f"Test dir      : {training_config['test_dir']}\n")
    f.write(f"Enhancement   : {training_config['enhancement']}\n")
    f.write(f"In channels   : {training_config['in_channels']}\n")

print(f"\n results saved to: {training_config['out_report']}")
print(f"Best model saved to:   {training_config['out_model']}")

  print(f"\ results saved to: {training_config['out_report']}")


Using device: cpu
Number of classes: 5
Training samples: 1750
Validation samples: 500
Test samples: 250

Starting training for 6 epochs...


Epoch 1/6 [train]: 100%|██████████| 14/14 [01:10<00:00,  5.01s/it]
Epoch 1/6 [val]: 100%|██████████| 4/4 [00:08<00:00,  2.20s/it]


Epoch 1: train_loss=0.9348  train_acc=0.7240  val_acc=0.3340
  -> New best model saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth (val_acc=0.3340)


Epoch 2/6 [train]: 100%|██████████| 14/14 [01:10<00:00,  5.05s/it]
Epoch 2/6 [val]: 100%|██████████| 4/4 [00:10<00:00,  2.72s/it]


Epoch 2: train_loss=0.1743  train_acc=0.9457  val_acc=0.3100
  -> Val acc did not improve. Patience: 1/10


Epoch 3/6 [train]: 100%|██████████| 14/14 [01:09<00:00,  4.94s/it]
Epoch 3/6 [val]: 100%|██████████| 4/4 [00:10<00:00,  2.55s/it]


Epoch 3: train_loss=0.0687  train_acc=0.9749  val_acc=0.6560
  -> New best model saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth (val_acc=0.6560)


Epoch 4/6 [train]: 100%|██████████| 14/14 [01:12<00:00,  5.21s/it]
Epoch 4/6 [val]: 100%|██████████| 4/4 [00:09<00:00,  2.49s/it]


Epoch 4: train_loss=0.0325  train_acc=0.9886  val_acc=0.8780
  -> New best model saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth (val_acc=0.8780)


Epoch 5/6 [train]: 100%|██████████| 14/14 [01:12<00:00,  5.17s/it]
Epoch 5/6 [val]: 100%|██████████| 4/4 [00:09<00:00,  2.31s/it]


Epoch 5: train_loss=0.0174  train_acc=0.9954  val_acc=0.9340
  -> New best model saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth (val_acc=0.9340)


Epoch 6/6 [train]: 100%|██████████| 14/14 [01:12<00:00,  5.16s/it]
Epoch 6/6 [val]: 100%|██████████| 4/4 [00:09<00:00,  2.37s/it]


Epoch 6: train_loss=0.0063  train_acc=0.9977  val_acc=0.9440
  -> New best model saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth (val_acc=0.9440)

Loading best model for final evaluation...

Evaluating on validation set...


Validating: 100%|██████████| 4/4 [00:12<00:00,  3.01s/it]


===== FINAL  RESULTS ====
  Acc      = 0.9440
  F1(macro)= 0.9441
  AUC(OVR) = 0.9960
\ results saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/test_results_combined_features.txt
Best model saved to:   /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth





In [None]:
print("\nLoading best model for final test evaluation...")

# Rebuild the model architecture
best_model = build_resnet18(
    num_classes=num_classes,
    pretrained=False,
    dropout=0.0,
    freeze_backbone=False,
    in_channels=training_config["in_channels"],
).to(device)

# Load the best weights from the saved checkpoint
load_checkpoint(best_model, training_config["out_model"], map_location=device, expected_in_channels=training_config["in_channels"])

# Set the model to evaluation mode
best_model.eval()

y_true_test, y_pred_test, y_prob_rows_test = [], [], []

print("\nEvaluating on test set...")
with torch.no_grad():
    for imgs, labels in tqdm(test_loader, desc="Testing"):
        imgs = imgs.to(device, non_blocking=True)
        with amp.autocast() if use_amp else contextlib.nullcontext():
            logits = best_model(imgs)
        probs = torch.softmax(logits, dim=1).cpu().numpy()
        preds = logits.argmax(1).cpu().tolist()

        y_true_test += labels.tolist()
        y_pred_test += preds
        y_prob_rows_test += probs.tolist()

y_prob_test = np.array(y_prob_rows_test)

row_sums_test = y_prob_test.sum(axis=1, keepdims=True)
y_prob_test = np.divide(y_prob_test, row_sums_test, out=np.zeros_like(y_prob_test), where=row_sums_test != 0)


acc_test = accuracy_score(y_true_test, y_pred_test)
f1_test  = f1_score(y_true_test, y_pred_test, average='macro')
auc_test = roc_auc_score(y_true_test, y_prob_test, multi_class='ovr')

print(f"\n===== FINAL TEST RESULTS ====")
print(f"  Acc      = {acc_test:.4f}")
print(f"  F1(macro)= {f1_test:.4f}")
print(f"  AUC(OVR) = {auc_test:.4f}")


with open(training_config["out_report"], "w") as f:
    f.write("Test results (best model based on val_acc) for combined features\n")
    f.write(f"Accuracy      : {acc_test:.6f}\n")
    f.write(f"F1 (macro)    : {f1_test:.6f}\n")
    f.write(f"AUC (OVR)     : {auc_test:.6f}\n")
    f.write(f"Best val_acc  : {best_val_acc:.6f}\n") # This is still the best val_acc from training
    f.write(f"Num classes   : {num_classes}\n")
    f.write(f"Used classes  : {train_ds.classes}\n")
    f.write(f"Train dir     : {training_config['train_dir']}\n")
    f.write(f"Val dir       : {training_config['val_dir']}\n")
    f.write(f"Test dir      : {training_config['test_dir']}\n")
    f.write(f"Enhancement   : {training_config['enhancement']}\n")
    f.write(f"In channels   : {training_config['in_channels']}\n")

print(f"\nTest results saved to: {training_config['out_report']}")
print(f"Best model saved to:   {training_config['out_model']}")


Loading best model for final test evaluation...

Evaluating on test set...


Testing: 100%|██████████| 2/2 [01:31<00:00, 45.59s/it]


===== FINAL TEST RESULTS ====
  Acc      = 0.9280
  F1(macro)= 0.9277
  AUC(OVR) = 0.9945

Test results saved to: /content/drive/MyDrive/Colab Notebooks/data/ECE253/test_results_combined_features.txt
Best model saved to:   /content/drive/MyDrive/Colab Notebooks/data/ECE253/best_resnet18_combined_features.pth



