In [52]:
import os
import torch
import torch.nn as nn
import pandas as pd
import numpy as np
from torchvision import models, transforms, datasets
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch.utils.data import DataLoader
from pathlib import Path
import shutil

In [43]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [44]:
input_path1 = Path("data/datasets/trashnet_01/")
input_path2 = Path("data/datasets/self-collected/")

In [45]:
eval_tf = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])


source_ds = datasets.ImageFolder(input_path1 / "test",  transform= eval_tf)

target_ds = datasets.ImageFolder(input_path2, transform= eval_tf)

dataloader_source = DataLoader(
    source_ds,
    batch_size=32,
    shuffle=False,
    num_workers=4
)

dataloader_target = DataLoader(
    target_ds,
    batch_size=32,
    shuffle=False,
    num_workers=4
)

num_classes = len(target_ds.classes)


In [None]:
#Function to rebuild models for evaluation

def rebuild_model(filename: str, num_classes: int, device):
    
    if filename.startswith("resnet50"):
        m = models.resnet50(weights=None)
        m.fc = nn.Linear(m.fc.in_features, num_classes)

    elif filename.startswith("densenet121"):
        m = models.densenet121(weights=None)
        m.classifier = nn.Linear(m.classifier.in_features, num_classes)

    elif filename.startswith("convnext_tiny"):
        m = models.convnext_tiny(weights=None)
        m.classifier[2] = nn.Linear(m.classifier[2].in_features, num_classes)

    else:
        raise ValueError(f"Unknown architecture in filename: {filename}")

    return m.to(device)                            

In [None]:
#Function to calculate metrics

def evaluate_model_metrics(model, dataloader, device):
    model.eval()
    preds_list, labels_list = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            preds_list.extend(preds.cpu().tolist())
            labels_list.extend(labels.cpu().tolist())

        return {
            "accuracy": accuracy_score(labels_list, preds_list),
            "macro_precision": precision_score(labels_list, preds_list, average="macro", zero_division=0),
            "macro_recall": recall_score(labels_list, preds_list, average="macro", zero_division=0),
            "macro_f1": f1_score(labels_list, preds_list, average="macro", zero_division=0),
            "n": len(labels_list),
        }

In [None]:
#Calculating metrics for all the models

cnn_models_dir = "trained_models"
rows = []

for entry in sorted(os.listdir(cnn_models_dir)):
    if not entry.endswith(".pth"):
        continue

    ckpt_path = os.path.join(cnn_models_dir, entry)
    model_name = entry.replace(".pth", "")

    model = rebuild_model(entry, num_classes, device)

    ckpt = torch.load(ckpt_path, map_location=device)
    model.load_state_dict(ckpt["model_state_dict"])

    src = evaluate_model_metrics(model, dataloader_source, device)
    tgt = evaluate_model_metrics(model, dataloader_target, device)

    rows.append({
        "model": model_name,

        "source_acc": src["accuracy"],
        "source_macro_precision": src["macro_precision"],
        "source_macro_recall": src["macro_recall"],
        "source_macro_f1": src["macro_f1"],

        "target_acc": tgt["accuracy"],
        "target_macro_precision": tgt["macro_precision"],
        "target_macro_recall": tgt["macro_recall"],
        "target_macro_f1": tgt["macro_f1"],

        "source_n": src["n"],
        "target_n": tgt["n"],
    })

    del model
    torch.cuda.empty_cache()

df = pd.DataFrame(rows)
metric_cols = [c for c in df.columns if c not in ["model", "source_n", "target_n"]]
df[metric_cols] = df[metric_cols].round(4)

df["macro_f1_drop"] = (df["source_macro_f1"] - df["target_macro_f1"]).round(4)

print(df.to_string(index=False))    

               model  source_acc  source_macro_precision  source_macro_recall  source_macro_f1  target_acc  target_macro_precision  target_macro_recall  target_macro_f1  source_n  target_n  macro_f1_drop
  convnext_tiny_base      0.9377                  0.9361               0.9288           0.9321      0.4812                  0.5977               0.4795           0.4556       257       478         0.4765
   convnext_tiny_geo      0.9144                  0.9022               0.9242           0.9087      0.4874                  0.5683               0.4858           0.4691       257       478         0.4396
 convnext_tiny_mixed      0.9261                  0.9119               0.9132           0.9119      0.4456                  0.5061               0.4444           0.4406       257       478         0.4713
 convnext_tiny_photo      0.9533                  0.9590               0.9362           0.9451      0.4958                  0.5958               0.4944           0.4889       257      

In [53]:
def build_subgroup_folder(
    csv_path,
    images_root,
    out_root,
    filter_col,
    filter_value,        
    class_col="class",
    id_col="id",
    overwrite=True,
    copy_files=True    
):
    csv_path = Path(csv_path)
    images_root = Path(images_root)
    out_root = Path(out_root)

    df = pd.read_csv(csv_path)

    sub = df[df[filter_col] == filter_value].copy()
    subgroup_name = f"{filter_col}_{filter_value}"
    dst_root = out_root / subgroup_name

    if overwrite and dst_root.exists():
        shutil.rmtree(dst_root)
    dst_root.mkdir(parents=True, exist_ok=True)

    missing = 0
    written = 0

    for _, row in sub.iterrows():
        cls = str(row[class_col]).strip()
        fname = str(row[id_col]).strip()

        src = images_root / cls / fname
        if not src.exists():
            missing += 1
            continue

        dst_class_dir = dst_root / cls
        dst_class_dir.mkdir(parents=True, exist_ok=True)
        dst = dst_class_dir / fname

        if copy_files:
            shutil.copy2(src, dst)
        else:
            if dst.exists():
                dst.unlink()
            dst.symlink_to(src.resolve())

        written += 1
        
    return dst_root


In [54]:
csv_path = "subgroups.csv"
images_root = input_path2
out_root = "subgroups"

low_light_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="lighting", filter_value=2,
                                       copy_files=True)

good_light_root = build_subgroup_folder(csv_path, images_root, out_root,
                                        filter_col="lighting", filter_value=1,
                                        copy_files=True)

background_tree_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="background", filter_value="tree",
                                       copy_files=True)

background_wall_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="background", filter_value="wall",
                                       copy_files=True)

background_white_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="background", filter_value="white",
                                       copy_files=True)

background_floor_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="background", filter_value="floor",
                                       copy_files=True)  

near_view_root = build_subgroup_folder(csv_path, images_root, out_root,
                                       filter_col="view", filter_value=1,
                                       copy_files=True)

far_view_root = build_subgroup_folder(csv_path, images_root, out_root,
                                        filter_col="view", filter_value=2,
                                        copy_files=True)                                                                                                                   

In [None]:
# Evaluate all models on all subgroups
subgroups_root = Path("subgroups")  
cnn_models_dir_path = Path(cnn_models_dir) 
rows = []

for entry in sorted(cnn_models_dir_path.iterdir()):
    if not entry.name.endswith(".pth"):
        continue

    model_name = entry.stem
    print(f"\n=== Evaluating model: {model_name} ===")

    model = rebuild_model(entry.name, num_classes, device)
    ckpt = torch.load(entry, map_location=device)
    model.load_state_dict(ckpt["model_state_dict"])

    model.eval()

    for sg_dir in sorted(subgroups_root.iterdir()):
        if not sg_dir.is_dir():
            continue

        ds = datasets.ImageFolder(sg_dir, transform=eval_tf)

        if len(ds) == 0:
            rows.append({
                "model": model_name,
                "subgroup": sg_dir.name,
                "accuracy": float("nan"),
                "macro_precision": float("nan"),
                "macro_recall": float("nan"),
                "macro_f1": float("nan"),
                "n": 0,
                "note": "empty subgroup"
            })
            continue

        loader = torch.utils.data.DataLoader(
            ds,
            batch_size=32,
            shuffle=False,
            num_workers=0 
        )

        metrics = evaluate_model_metrics(model, loader, device)

        rows.append({
            "model": model_name,
            "subgroup": sg_dir.name,
            "accuracy": metrics["accuracy"],
            "macro_precision": metrics["macro_precision"],
            "macro_recall": metrics["macro_recall"],
            "macro_f1": metrics["macro_f1"],
            "n": metrics["n"],
            "note": ""
        })

        print(
            f"  {sg_dir.name:20s} | "
            f"n={metrics['n']:4d} | "
            f"acc={metrics['accuracy']:.4f} | "
            f"macroF1={metrics['macro_f1']:.4f}"
        )

    del model
    torch.cuda.empty_cache()

# Create DataFrame and display results
df_subgroups = pd.DataFrame(rows)
metric_cols = [c for c in df_subgroups.columns if c not in ["model", "subgroup", "n", "note"]]
df_subgroups[metric_cols] = df_subgroups[metric_cols].round(4)

print("\n" + "="*80)
print("Subgroup Evaluation Results for All Models")
print("="*80)
print(df_subgroups.to_string(index=False))


=== Evaluating model: convnext_tiny_base ===
  background_floor     | n= 120 | acc=0.5917 | macroF1=0.5574
  background_tree      | n= 120 | acc=0.3417 | macroF1=0.2757
  background_wall      | n= 118 | acc=0.4153 | macroF1=0.3827
  background_white     | n= 120 | acc=0.5750 | macroF1=0.5508
  lighting_1           | n= 239 | acc=0.5439 | macroF1=0.5244
  lighting_2           | n= 239 | acc=0.4184 | macroF1=0.3758
  view_1               | n= 240 | acc=0.5167 | macroF1=0.4901
  view_2               | n= 238 | acc=0.4454 | macroF1=0.4192

=== Evaluating model: convnext_tiny_geo ===
  background_floor     | n= 120 | acc=0.5917 | macroF1=0.5606
  background_tree      | n= 120 | acc=0.3750 | macroF1=0.3559
  background_wall      | n= 118 | acc=0.4576 | macroF1=0.4322
  background_white     | n= 120 | acc=0.5250 | macroF1=0.5212
  lighting_1           | n= 239 | acc=0.5649 | macroF1=0.5532
  lighting_2           | n= 239 | acc=0.4100 | macroF1=0.3774
  view_1               | n= 240 | acc=0.5