In [None]:
!unzip /content/d1.zip -d /content/d1


In [2]:
import requests
BASE = "https://unshielded-scalpless-ava.ngrok-free.dev"
r = requests.get(f"{BASE}/ping", headers={"ngrok-skip-browser-warning":"true"})
print(r.status_code, r.text)


200 {"ok":true}


In [5]:
import requests

NGROK_API = "https://unshielded-scalpless-ava.ngrok-free.dev"  # your API URL

# Upload a zip to start a job
files = {"file": open("Apple_Dataset.zip\content\Apple_Dataset", "rb")}
r = requests.post(f"{NGROK_API}/upload", files=files)
print(r.status_code, r.text)
job_id = r.json()["jobId"]

# Poll status
import time
while True:
    s = requests.get(f"{NGROK_API}/jobs/{job_id}").json()
    print(s["status"], s["progress"], s.get("message",""))
    if s["status"] in ("succeeded","failed"):
        break
    time.sleep(2)

# Download result
if s["status"] == "succeeded":
    z = requests.get(f"{NGROK_API}/jobs/{job_id}/download")
    with open("/content/balanced_dataset.zip", "wb") as f:
        f.write(z.content)
    print("Saved to /content/balanced_dataset.zip")


  files = {"file": open("Apple_Dataset.zip\content\Apple_Dataset", "rb")}


FileNotFoundError: [Errno 2] No such file or directory: 'Apple_Dataset.zip\\content\\Apple_Dataset'

In [None]:
# ✅ One-cell pipeline: build LeafGAN-style dataset, then make LFLSeg dataset from it

import os, shutil, random
from PIL import Image

# --------------------
# USER PATHS (edit if needed)
# --------------------
healthy_src = "/content/d1/d1/Apple___healthy"
diseased_src = "/content/d1/d1/Apple___Black_rot"

# Where step 1 writes (CycleGAN/LeafGAN-style structure)
leafgan_root = "/content/Apple_Dataset_for_user"          # <— step 2 will read from here

# Where step 2 writes (LFLSeg-style structure)
lflseg_root = "/content/lflseg_dataset_sample_for_user"

split_ratio = 0.8  # 80% train / 20% test

# --------------------
# Utils
# --------------------
def ensure_dirs(path_list):
    for p in path_list:
        os.makedirs(p, exist_ok=True)

def list_images(src):
    return [f for f in os.listdir(src) if f.lower().endswith((".jpg",".png",".jpeg"))]

# --------------------
# STEP 1: Build LeafGAN/CycleGAN dataset: trainA/testA (healthy), trainB/testB (diseased)
# --------------------
for sub in ["trainA", "testA", "trainB", "testB"]:
    os.makedirs(os.path.join(leafgan_root, sub), exist_ok=True)

def split_and_copy(src, dst_train, dst_test, ratio=0.8):
    if not os.path.exists(src):
        raise FileNotFoundError(f"Source not found: {src}")
    files = list_images(src)
    random.shuffle(files)
    split_idx = int(len(files) * ratio)
    train_files, test_files = files[:split_idx], files[split_idx:]

    for f in train_files:
        shutil.copy(os.path.join(src, f), os.path.join(dst_train, f))
    for f in test_files:
        shutil.copy(os.path.join(src, f), os.path.join(dst_test, f))

    return len(train_files), len(test_files)

trainA_count, testA_count = split_and_copy(
    healthy_src,
    os.path.join(leafgan_root, "trainA"),
    os.path.join(leafgan_root, "testA"),
    split_ratio
)
trainB_count, testB_count = split_and_copy(
    diseased_src,
    os.path.join(leafgan_root, "trainB"),
    os.path.join(leafgan_root, "testB"),
    split_ratio
)

print("✅ Step 1 complete. LeafGAN dataset at:", leafgan_root)
print(f"Healthy → trainA: {trainA_count}, testA: {testA_count}")
print(f"Diseased → trainB: {trainB_count}, testB: {testB_count}")

# --------------------
# STEP 2: Prepare LFLSeg dataset from Step 1 output
# Structure:
#   lflseg_root/
#     train/{full_leaf, partial_leaf, non_leaf}
#     test/{full_leaf, partial_leaf, non_leaf}
# --------------------
def collect_full_leaf(src_dirs, dst):
    os.makedirs(dst, exist_ok=True)
    for src in src_dirs:
        if os.path.exists(src):
            for f in list_images(src):
                shutil.copy(os.path.join(src, f), os.path.join(dst, f))

def generate_partial_leaf(src_full, dst_partial):
    os.makedirs(dst_partial, exist_ok=True)
    for f in list_images(src_full):
        in_path = os.path.join(src_full, f)
        try:
            with Image.open(in_path) as img:
                img = img.convert("RGB")
                w, h = img.size
                crops = [
                    img.crop((0, 0, w//2, h//2)),        # top-left
                    img.crop((w//2, 0, w, h//2)),        # top-right
                    img.crop((0, h//2, w//2, h)),        # bottom-left
                    img.crop((w//2, h//2, w, h))         # bottom-right
                ]
                stem, _ = os.path.splitext(f)
                for i, c in enumerate(crops):
                    c.save(os.path.join(dst_partial, f"{stem}_patch{i}.jpg"), quality=95)
        except Exception as e:
            print("Skipping (partial) due to error:", in_path, e)

def generate_non_leaf(src_full, dst_non_leaf, count=300):
    os.makedirs(dst_non_leaf, exist_ok=True)
    files = list_images(src_full)
    random.shuffle(files)
    picked = files[:min(count, len(files))]
    for f in picked:
        in_path = os.path.join(src_full, f)
        try:
            with Image.open(in_path) as img:
                img = img.convert("RGB")
                w, h = img.size
                if w < 8 or h < 8:
                    continue
                # random mid-sized crop
                cw, ch = max(16, w // 4), max(16, h // 4)
                x0 = random.randint(0, max(0, w - cw))
                y0 = random.randint(0, max(0, h - ch))
                crop = img.crop((x0, y0, x0 + cw, y0 + ch))
                stem, _ = os.path.splitext(f)
                crop.save(os.path.join(dst_non_leaf, f"bg_{stem}.jpg"), quality=95)
        except Exception as e:
            print("Skipping (non-leaf) due to error:", in_path, e)

for split in ["train", "test"]:
    full_dst = os.path.join(lflseg_root, split, "full_leaf")
    partial_dst = os.path.join(lflseg_root, split, "partial_leaf")
    nonleaf_dst = os.path.join(lflseg_root, split, "non_leaf")

    if split == "train":
        collect_full_leaf(
            [os.path.join(leafgan_root, "trainA"), os.path.join(leafgan_root, "trainB")],
            full_dst
        )
    else:
        collect_full_leaf(
            [os.path.join(leafgan_root, "testA"), os.path.join(leafgan_root, "testB")],
            full_dst
        )

    generate_partial_leaf(full_dst, partial_dst)
    generate_non_leaf(full_dst, nonleaf_dst)

print("✅ Step 2 complete. LFLSeg dataset at:", lflseg_root)

# Verify counts
for split in ["train", "test"]:
    for cls in ["full_leaf", "partial_leaf", "non_leaf"]:
        path = os.path.join(lflseg_root, split, cls)
        cnt = len([f for f in os.listdir(path) if f.lower().endswith((".jpg",".png",".jpeg"))]) if os.path.exists(path) else 0
        print(f"{split:5s} {cls:12s}: {cnt}")


✅ Step 1 complete. LeafGAN dataset at: /content/Apple_Dataset_for_user
Healthy → trainA: 1316, testA: 329
Diseased → trainB: 496, testB: 125
✅ Step 2 complete. LFLSeg dataset at: /content/lflseg_dataset_sample_for_user
train full_leaf   : 1812
train partial_leaf: 7248
train non_leaf    : 300
test  full_leaf   : 454
test  partial_leaf: 1816
test  non_leaf    : 300


In [None]:
# (Optional) faster installs on fresh runtimes
!pip install --quiet torch torchvision

# =========================
# LFLSEG training (3 epochs)
# =========================
import os, torch, json
from torch import nn, optim
from torchvision import transforms, datasets, models
from torchvision.models import ResNet101_Weights
from torch.utils.data import DataLoader

# Must match the output of your FIRST cell:
data_root = "/content/lflseg_dataset_sample_for_user"
train_dir = os.path.join(data_root, "train")
val_dir   = os.path.join(data_root, "test")

assert os.path.isdir(train_dir), f"Train dir not found: {train_dir}"
assert os.path.isdir(val_dir),   f"Val dir not found: {val_dir}"

# Data transforms
transform_train = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=(0, 270)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

transform_val = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

# Datasets / loaders
train_ds = datasets.ImageFolder(train_dir, transform=transform_train)
val_ds   = datasets.ImageFolder(val_dir,   transform=transform_val)

# Save the class→index mapping for later inference
os.makedirs("/content/ckpts", exist_ok=True)
with open("/content/ckpts/class_to_idx.json", "w") as f:
    json.dump(train_ds.class_to_idx, f, indent=2)

num_workers = 2  # Colab-friendly
batch_size  = 64

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=num_workers, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True)

# Model
device = "cuda" if torch.cuda.is_available() else "cpu"
weights = ResNet101_Weights.IMAGENET1K_V1
model = models.resnet101(weights=weights)

# Replace classifier head (3 classes: full_leaf, partial_leaf, non_leaf)
num_classes = len(train_ds.classes)
model.fc = nn.Linear(model.fc.in_features, num_classes)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# -----------------
# Train for 3 epochs
# -----------------
best_acc = 0.0
epochs = 3

for epoch in range(1, epochs + 1):
    model.train()
    running_loss = 0.0
    running_correct = 0

    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        out = model(imgs)
        loss = criterion(out, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)
        running_correct += (out.argmax(1) == labels).sum().item()

    scheduler.step()

    # Validation
    model.eval()
    correct = 0; total = 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            out = model(imgs)
            correct += (out.argmax(1) == labels).sum().item()
            total += imgs.size(0)

    train_loss = running_loss / len(train_ds) if len(train_ds) else 0.0
    val_acc = (correct / total) if total else 0.0
    print(f"Epoch {epoch}/{epochs}: train_loss={train_loss:.4f}  val_acc={val_acc:.4f}")

    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), "/content/ckpts/lflseg_resnet101_best_for_user.pth")

print("✅ Done. Best val acc:", best_acc)
print("Saved:", "/content/ckpts/lflseg_resnet101_best_for_user.pth")
print("Classes:", train_ds.classes)


Downloading: "https://download.pytorch.org/models/resnet101-63fe2227.pth" to /root/.cache/torch/hub/checkpoints/resnet101-63fe2227.pth


100%|██████████| 171M/171M [00:00<00:00, 182MB/s]


Epoch 1/3: train_loss=0.3062  val_acc=0.8755
Epoch 2/3: train_loss=0.1571  val_acc=0.8895
Epoch 3/3: train_loss=0.1002  val_acc=0.9074
✅ Done. Best val acc: 0.9073929961089494
Saved: /content/ckpts/lflseg_resnet101_best_for_user.pth
Classes: ['full_leaf', 'non_leaf', 'partial_leaf']


In [None]:
# Installs (pytorch-grad-cam is the correct package name)
!pip install --quiet grad-cam opencv-python pillow tqdm


# ---- Grad-CAM over LeafGAN-style splits, saves masks + masked RGB ----
import os, cv2, numpy as np, torch, json
from pathlib import Path
from PIL import Image
from torchvision import transforms
from torchvision.models import resnet101
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from tqdm import tqdm

# ====== CONFIG (paths aligned to your previous cells) ======
# 1) Trained LFLSeg weights from your SECOND cell:
LFLSeg_weights = "/content/ckpts/lflseg_resnet101_best_for_user.pth"

# 2) Root of original LeafGAN/CycleGAN dataset created in your FIRST cell:
#    (contains trainA, trainB, testA, testB)
original_root = "/content/Apple_Dataset_for_user"

# 3) Output root (masks + masked RGBs will be written here)
out_root = "/content/masked_apple_dataset"
os.makedirs(out_root, exist_ok=True)

# 4) LFLSeg dataset root from FIRST cell (for class auto-detect: 'full_leaf', 'partial_leaf', 'non_leaf')
lflseg_root = "/content/lflseg_dataset_sample_for_user"

# 5) Splits to process
splits = ["trainA", "trainB", "testA", "testB"]

# GradCAM / mask hyperparams
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
FULL_LEAF_CLASS_INDEX = None   # auto-detect from lflseg dataset
THRESH_PCT = 80
GAUSSIAN_SIGMA = 3.0
MORPH_KERNEL = 3
MIN_AREA_RATIO = 0.001

# ====== helpers ======
def safe_load_state(model, path, map_location):
    ckpt = torch.load(path, map_location=map_location)
    if isinstance(ckpt, dict) and "state_dict" in ckpt:
        ckpt = ckpt["state_dict"]
    if isinstance(ckpt, dict):
        new_ckpt = {}
        for k, v in ckpt.items():
            new_k = k.replace("module.", "") if k.startswith("module.") else k
            new_ckpt[new_k] = v
        ckpt = new_ckpt
    return ckpt

def green_mask_from_rgb(rgb_img):
    hsv = cv2.cvtColor(rgb_img, cv2.COLOR_RGB2HSV)
    lower = np.array([20, 25, 20])
    upper = np.array([100, 255, 255])
    mask = cv2.inRange(hsv, lower, upper)
    return (mask > 0).astype(np.uint8)

def fill_holes_and_keep_largest(mask):
    kernel_close = np.ones((7,7), np.uint8)
    mask_close = cv2.morphologyEx(mask.astype(np.uint8), cv2.MORPH_CLOSE, kernel_close)
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_close, connectivity=8)
    if num_labels <= 1:
        final = mask_close
    else:
        areas = stats[1:, cv2.CC_STAT_AREA]
        largest_idx = 1 + int(np.argmax(areas))
        final = (labels == largest_idx).astype(np.uint8)
    k2 = np.ones((MORPH_KERNEL, MORPH_KERNEL), np.uint8)
    final = cv2.morphologyEx(final, cv2.MORPH_OPEN, k2)
    final = cv2.morphologyEx(final, cv2.MORPH_CLOSE, k2)
    return final

# ====== Load model & GradCAM ======
print("Loading LFLSeg model...")
model = resnet101(weights=None)
model.fc = torch.nn.Linear(model.fc.in_features, 3)   # 3 classes: full/partial/non_leaf
state = safe_load_state(model, LFLSeg_weights, map_location=torch.device(DEVICE))
model.load_state_dict(state)
model = model.to(DEVICE).eval()
target_layer = model.layer4[-1]
cam = GradCAM(model=model, target_layers=[target_layer])

# ====== auto-detect FULL_LEAF_CLASS_INDEX from lflseg dataset ======
if FULL_LEAF_CLASS_INDEX is None:
    try:
        from torchvision import datasets
        ds = datasets.ImageFolder(os.path.join(lflseg_root, "train"))
        print("Detected LFLSeg classes:", ds.classes, ds.class_to_idx)
        if "full_leaf" in ds.class_to_idx:
            FULL_LEAF_CLASS_INDEX = ds.class_to_idx["full_leaf"]
        else:
            FULL_LEAF_CLASS_INDEX = 0
            print("Warning: 'full_leaf' not found; defaulting to index 0")
    except Exception as e:
        FULL_LEAF_CLASS_INDEX = 0
        print("Auto-detect failed; defaulting FULL_LEAF_CLASS_INDEX=0. Error:", e)

# ====== preprocessing (match validation transforms) ======
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

# ====== Process each split ======
total_processed = 0
for split in splits:
    orig_dir = os.path.join(original_root, split)
    save_mask_dir = os.path.join(out_root, split, "masks")
    save_rgb_dir  = os.path.join(out_root, split, "masked_rgb")
    os.makedirs(save_mask_dir, exist_ok=True)
    os.makedirs(save_rgb_dir, exist_ok=True)

    if not os.path.exists(orig_dir):
        print(f"Skipping {split}: directory not found -> {orig_dir}")
        continue

    files = sorted([f for f in os.listdir(orig_dir) if f.lower().endswith((".jpg",".png",".jpeg"))])
    print(f"Processing {len(files)} images in {split} -> {orig_dir}")

    for fname in tqdm(files):
        orig_path = os.path.join(orig_dir, fname)
        try:
            pil = Image.open(orig_path).convert("RGB")
        except Exception as e:
            print("Failed to open", orig_path, e)
            continue
        rgb = np.array(pil)
        h, w = rgb.shape[:2]

        # prepare input for CAM
        inp = transform(pil).unsqueeze(0).to(DEVICE)

        # GradCAM targeting full_leaf
        targets = [ClassifierOutputTarget(FULL_LEAF_CLASS_INDEX)]
        grayscale_cam = cam(input_tensor=inp, targets=targets)[0]  # 224x224

        # smooth + threshold
        cam_uint8 = (grayscale_cam * 255).astype(np.uint8)
        cam_blur = cv2.GaussianBlur(cam_uint8, (0,0), GAUSSIAN_SIGMA)
        cam_float = cam_blur.astype(np.float32) / 255.0
        try:
            _, th = cv2.threshold((cam_float*255).astype(np.uint8), 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
            cam_mask = (th > 0).astype(np.uint8)
        except Exception:
            p = np.percentile(cam_float, THRESH_PCT)
            cam_mask = (cam_float >= p).astype(np.uint8)

        # resize to original size
        cam_mask_resized = cv2.resize(cam_mask, (w, h), interpolation=cv2.INTER_NEAREST)

        # refine with green color mask
        green = green_mask_from_rgb(rgb)
        combined = cam_mask_resized & green
        if combined.sum() < max(1, int(MIN_AREA_RATIO * (h*w))):
            combined = cam_mask_resized.copy()

        # keep largest, fill holes
        final_mask = fill_holes_and_keep_largest(combined)

        # fallback if too small/too big
        area = final_mask.sum()
        img_area = h * w
        if area < max(1, int(MIN_AREA_RATIO * img_area)) or area > 0.98 * img_area:
            final_mask = fill_holes_and_keep_largest(cam_mask_resized)

        # save mask + masked rgb
        final_mask_u8 = (final_mask * 255).astype(np.uint8)
        mask_out_path = os.path.join(save_mask_dir, Path(fname).stem + "_mask.png")
        cv2.imwrite(mask_out_path, final_mask_u8)

        masked_rgb = rgb.copy()
        masked_rgb[final_mask == 0] = 0
        masked_out_path = os.path.join(save_rgb_dir, Path(fname).stem + "_masked.png")
        cv2.imwrite(masked_out_path, cv2.cvtColor(masked_rgb, cv2.COLOR_RGB2BGR))

        total_processed += 1

print(f"\n✅ Done. Processed {total_processed} images.")
print("Masks + masked RGB saved under:", out_root)


Loading LFLSeg model...
Detected LFLSeg classes: ['full_leaf', 'non_leaf', 'partial_leaf'] {'full_leaf': 0, 'non_leaf': 1, 'partial_leaf': 2}
Processing 1316 images in trainA -> /content/Apple_Dataset_for_user/trainA


100%|██████████| 1316/1316 [01:02<00:00, 20.98it/s]


Processing 496 images in trainB -> /content/Apple_Dataset_for_user/trainB


100%|██████████| 496/496 [00:21<00:00, 23.28it/s]


Processing 329 images in testA -> /content/Apple_Dataset_for_user/testA


100%|██████████| 329/329 [00:14<00:00, 22.44it/s]


Processing 125 images in testB -> /content/Apple_Dataset_for_user/testB


100%|██████████| 125/125 [00:05<00:00, 24.54it/s]


✅ Done. Processed 2266 images.
Masks + masked RGB saved under: /content/masked_apple_dataset





In [None]:
# prepare_leafgan_dataset.py

import os, shutil, random
from pathlib import Path

random.seed(42)

# ✅ Path to GradCAM output
masked_root = "/content/masked_apple_dataset"

# ✅ Final output for GAN training
leafgan_root = "/content/LeafGAN_masked_final_dataset_for_user"
os.makedirs(leafgan_root, exist_ok=True)

for d in ["trainA","trainB","testA","testB"]:
    os.makedirs(os.path.join(leafgan_root, d), exist_ok=True)

def list_masked_rgb(split):
    p = os.path.join(masked_root, split, "masked_rgb")
    if not os.path.exists(p):
        return []
    return sorted([
        os.path.join(p,f)
        for f in os.listdir(p)
        if f.lower().endswith((".png",".jpg",".jpeg"))
    ])

trainA_files = list_masked_rgb("trainA")
trainB_files = list_masked_rgb("trainB")
testA_files  = list_masked_rgb("testA")
testB_files  = list_masked_rgb("testB")

def ensure_test_split(train_list, test_list, dst_train, dst_test):
    if len(test_list) == 0:
        random.shuffle(train_list)
        idx = int(0.8 * len(train_list))
        train_part = train_list[:idx]
        test_part  = train_list[idx:]
    else:
        train_part = train_list
        test_part = test_list

    for src in train_part:
        shutil.copy(src, os.path.join(dst_train, os.path.basename(src)))
    for src in test_part:
        shutil.copy(src, os.path.join(dst_test, os.path.basename(src)))

    return len(train_part), len(test_part)

tA_train,tA_test = ensure_test_split(trainA_files, testA_files,
                                     os.path.join(leafgan_root,"trainA"),
                                     os.path.join(leafgan_root,"testA"))

tB_train,tB_test = ensure_test_split(trainB_files, testB_files,
                                     os.path.join(leafgan_root,"trainB"),
                                     os.path.join(leafgan_root,"testB"))

print("✅ LeafGAN dataset built at:", leafgan_root)
print("trainA:", tA_train, " testA:", tA_test)
print("trainB:", tB_train, " testB:", tB_test)


✅ LeafGAN dataset built at: /content/LeafGAN_masked_final_dataset_for_user
trainA: 1316  testA: 329
trainB: 496  testB: 125


In [None]:
import shutil
from google.colab import files

# Path to your GAN dataset
leafgan_root = "/content/LeafGAN_masked_final_dataset_for_user"

# Zip the dataset
zip_path = "/content/LeafGAN_masked_final_dataset.zip"
if os.path.exists(zip_path):
    os.remove(zip_path)
shutil.make_archive("/content/LeafGAN_masked_final_dataset", 'zip', leafgan_root)

print("✅ Zipped dataset at:", zip_path)

# Download to local machine
files.download(zip_path)


✅ Zipped dataset at: /content/LeafGAN_masked_final_dataset.zip


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# Colab cell
!pip install torch torchvision tqdm pillow opencv-python pytorch-grad-cam



[31mERROR: Could not find a version that satisfies the requirement pytorch-grad-cam (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for pytorch-grad-cam[0m[31m
[0m

In [None]:
# LeafGAN_train_no_masks.py  (paste into one notebook cell and run)
!pip install --quiet grad-cam

import os, time, itertools, random
from pathlib import Path
import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from PIL import Image
import numpy as np

# ----------------- CONFIG -----------------
LEAFGAN_ROOT = "/content/LeafGAN_masked_final_dataset_for_user"   # must contain trainA, trainB, testA, testB
SAVE_ROOT    = "/content/leafgan_checkpoints_no_masks_for_user"
os.makedirs(SAVE_ROOT, exist_ok=True)

IMG_SIZE = 256
BATCH_SIZE = 1
EPOCHS = 2              # change as needed
LR = 2e-4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Loss weights
LAMBDA_CYCLE = 10.0
LAMBDA_ID = 5.0

# Random seed
random.seed(42)
# ------------------------------------------

# ---------- Dataset ----------
class LeafGANDataset(Dataset):
    """
    Loads unpaired CycleGAN-style images from trainA and trainB folders.
    """
    def __init__(self, rootA, rootB, transform=None):
        self.rootA = rootA
        self.rootB = rootB
        self.A_files = sorted([f for f in os.listdir(rootA) if f.lower().endswith((".png",".jpg",".jpeg"))])
        self.B_files = sorted([f for f in os.listdir(rootB) if f.lower().endswith((".png",".jpg",".jpeg"))])
        self.transform = transform

    def __len__(self):
        return max(len(self.A_files), len(self.B_files))

    def _load_img(self, path):
        img = Image.open(path).convert("RGB")
        return img

    def __getitem__(self, idx):
        a_name = self.A_files[idx % len(self.A_files)]
        b_name = self.B_files[idx % len(self.B_files)]
        A = self._load_img(os.path.join(self.rootA, a_name))
        B = self._load_img(os.path.join(self.rootB, b_name))
        if self.transform:
            A_t = self.transform(A)
            B_t = self.transform(B)
        else:
            A_t = A; B_t = B
        return {"A": A_t, "B": B_t, "A_name": a_name, "B_name": b_name}

# transforms
transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])

train_dataset = LeafGANDataset(os.path.join(LEAFGAN_ROOT,"trainA"),
                               os.path.join(LEAFGAN_ROOT,"trainB"),
                               transform=transform)
dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, drop_last=True)

# ---------- Models ----------
class ResnetBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.conv_block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, 3),
            nn.InstanceNorm2d(dim),
            nn.ReLU(True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, 3),
            nn.InstanceNorm2d(dim),
        )
    def forward(self, x):
        return x + self.conv_block(x)

class ResnetGenerator(nn.Module):
    def __init__(self, in_c=3, out_c=3, n_blocks=6, ngf=64):
        super().__init__()
        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(in_c, ngf, 7), nn.InstanceNorm2d(ngf), nn.ReLU(True)]
        # downsample
        n_down = 2
        curr = ngf
        for i in range(n_down):
            model += [nn.Conv2d(curr, curr*2, 3, 2, 1), nn.InstanceNorm2d(curr*2), nn.ReLU(True)]
            curr *= 2
        # resblocks
        for _ in range(n_blocks):
            model += [ResnetBlock(curr)]
        # upsample
        for i in range(n_down):
            model += [nn.ConvTranspose2d(curr, curr//2, 3, 2, 1, output_padding=1), nn.InstanceNorm2d(curr//2), nn.ReLU(True)]
            curr = curr//2
        model += [nn.ReflectionPad2d(3), nn.Conv2d(curr, out_c, 7), nn.Tanh()]
        self.model = nn.Sequential(*model)
    def forward(self, x): return self.model(x)

class PatchDiscriminator(nn.Module):
    def __init__(self, in_c=3, ndf=64):
        super().__init__()
        model = [nn.Conv2d(in_c, ndf, 4, 2, 1), nn.LeakyReLU(0.2, True)]
        curr = ndf
        for n in [128,256,512]:
            model += [nn.Conv2d(curr, n, 4, 2 if n<512 else 1, 1), nn.InstanceNorm2d(n), nn.LeakyReLU(0.2, True)]
            curr = n
        model += [nn.Conv2d(curr, 1, 4, 1, 1)]
        self.model = nn.Sequential(*model)
    def forward(self, x): return self.model(x)

# Init models
G_AB = ResnetGenerator().to(DEVICE)
G_BA = ResnetGenerator().to(DEVICE)
D_A = PatchDiscriminator().to(DEVICE)
D_B = PatchDiscriminator().to(DEVICE)

# weight init
def init_weights(net, init_type='normal', gain=0.02):
    def init_func(m):
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and (classname.find('Conv')!=-1 or classname.find('Linear')!=-1):
            if init_type == 'normal':
                nn.init.normal_(m.weight.data, 0.0, gain)
            elif init_type == 'xavier':
                nn.init.xavier_normal_(m.weight.data)
            if hasattr(m, 'bias') and m.bias is not None:
                nn.init.constant_(m.bias.data, 0.0)
        elif classname.find('BatchNorm2d') != -1 or classname.find('InstanceNorm2d') != -1:
            if hasattr(m, 'weight') and m.weight is not None:
                nn.init.normal_(m.weight.data, 1.0, gain)
                nn.init.constant_(m.bias.data, 0.0)
    net.apply(init_func)

init_weights(G_AB); init_weights(G_BA); init_weights(D_A); init_weights(D_B)

# ---------- Losses & optimizers ----------
mse = nn.MSELoss()
l1 = nn.L1Loss()
opt_G = optim.Adam(itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=LR, betas=(0.5,0.999))
opt_D_A = optim.Adam(D_A.parameters(), lr=LR, betas=(0.5,0.999))
opt_D_B = optim.Adam(D_B.parameters(), lr=LR, betas=(0.5,0.999))

# For patch target shapes compute once by forward dummy
with torch.no_grad():
    dummy = torch.randn(1,3,IMG_SIZE,IMG_SIZE).to(DEVICE)
    out = D_A(dummy)
patch_h, patch_w = out.shape[2], out.shape[3]

def patch_target(x, val):
    return torch.full((x.size(0),1,patch_h,patch_w), val, device=DEVICE)

# ---------- Training loop ----------
print("Starting training on", DEVICE)
iters = 0
save_every = 5  # epochs
for epoch in range(1, EPOCHS+1):
    epoch_start = time.time()
    for i, data in enumerate(dataloader):
        real_A = data["A"].to(DEVICE)
        real_B = data["B"].to(DEVICE)

        # ground truths
        valid = patch_target(real_A, 1.0)
        fake_label = patch_target(real_A, 0.0)

        # ------------------ Train Generators ------------------
        opt_G.zero_grad()
        # generate
        fake_B = G_AB(real_A)
        fake_A = G_BA(real_B)
        # adv loss
        loss_GAN_AB = mse(D_B(fake_B), valid)
        loss_GAN_BA = mse(D_A(fake_A), valid)
        # cycle
        rec_A = G_BA(fake_B)
        rec_B = G_AB(fake_A)
        loss_cycle = l1(rec_A, real_A) + l1(rec_B, real_B)
        # identity
        loss_id = l1(G_BA(real_A), real_A) + l1(G_AB(real_B), real_B)

        loss_G = loss_GAN_AB + loss_GAN_BA + LAMBDA_CYCLE * loss_cycle + LAMBDA_ID * loss_id

        loss_G.backward(); opt_G.step()

        # ------------------ Train Discriminator A ------------------
        opt_D_A.zero_grad()
        loss_real = mse(D_A(real_A), valid)
        loss_fake = mse(D_A(fake_A.detach()), fake_label)
        loss_D_A = 0.5 * (loss_real + loss_fake)
        loss_D_A.backward(); opt_D_A.step()

        # ------------------ Train Discriminator B ------------------
        opt_D_B.zero_grad()
        loss_real = mse(D_B(real_B), valid)
        loss_fake = mse(D_B(fake_B.detach()), fake_label)
        loss_D_B = 0.5 * (loss_real + loss_fake)
        loss_D_B.backward(); opt_D_B.step()

        iters += 1

    # end epoch
    print(f"[Epoch {epoch}/{EPOCHS}] loss_G:{loss_G.item():.4f} loss_D_A:{loss_D_A.item():.4f} loss_D_B:{loss_D_B.item():.4f} time:{time.time()-epoch_start:.1f}s")

    # save samples and checkpoints
    if epoch % save_every == 0 or epoch==1:
        # save sample images from a small batch
        with torch.no_grad():
            sample = next(iter(dataloader))
            A_sample = sample["A"].to(DEVICE)
            B_sample = sample["B"].to(DEVICE)
            fakeB_sample = G_AB(A_sample)
            fakeA_sample = G_BA(B_sample)
            # denorm [-1,1] -> [0,1]
            def denorm(x): return (x*0.5 + 0.5).clamp(0,1)
            utils.save_image(denorm(fakeB_sample), os.path.join(SAVE_ROOT, f"fakeB_epoch{epoch}.png"), nrow=4)
            utils.save_image(denorm(fakeA_sample), os.path.join(SAVE_ROOT, f"fakeA_epoch{epoch}.png"), nrow=4)
        torch.save(G_AB.state_dict(), os.path.join(SAVE_ROOT, f"G_AB_epoch{epoch}.pth"))
        torch.save(G_BA.state_dict(), os.path.join(SAVE_ROOT, f"G_BA_epoch{epoch}.pth"))
        torch.save(D_A.state_dict(), os.path.join(SAVE_ROOT, f"D_A_epoch{epoch}.pth"))
        torch.save(D_B.state_dict(), os.path.join(SAVE_ROOT, f"D_B_epoch{epoch}.pth"))


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/7.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━[0m [32m4.6/7.8 MB[0m [31m144.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m7.8/7.8 MB[0m [31m143.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m96.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for grad-cam (pyproject.toml) ... [?25l[?25hdone


FileNotFoundError: [Errno 2] No such file or directory: '/content/LeafGAN_masked_final_dataset_for_user/trainA'

In [None]:
# inference cell
import os, torch
from torchvision import transforms, utils
from PIL import Image
from pathlib import Path
# from your_model_defs import ResnetGenerator  # or copy generator class from training cell
import torch.nn as nn # Import the nn module

# ---------- Models ----------
class ResnetBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.conv_block = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, 3),
            nn.InstanceNorm2d(dim),
            nn.ReLU(True),
            nn.ReflectionPad2d(1),
            nn.Conv2d(dim, dim, 3),
            nn.InstanceNorm2d(dim),
        )
    def forward(self, x):
        return x + self.conv_block(x)

class ResnetGenerator(nn.Module):
    def __init__(self, in_c=3, out_c=3, n_blocks=6, ngf=64):
        super().__init__()
        model = [nn.ReflectionPad2d(3),
                 nn.Conv2d(in_c, ngf, 7), nn.InstanceNorm2d(ngf), nn.ReLU(True)]
        # downsample
        n_down = 2
        curr = ngf
        for i in range(n_down):
            model += [nn.Conv2d(curr, curr*2, 3, 2, 1), nn.InstanceNorm2d(curr*2), nn.ReLU(True)]
            curr *= 2
        # resblocks
        for _ in range(n_blocks):
            model += [ResnetBlock(curr)]
        # upsample
        for i in range(n_down):
            model += [nn.ConvTranspose2d(curr, curr//2, 3, 2, 1, output_padding=1), nn.InstanceNorm2d(curr//2), nn.ReLU(True)]
            curr = curr//2
        model += [nn.ReflectionPad2d(3), nn.Conv2d(curr, out_c, 7), nn.Tanh()]
        self.model = nn.Sequential(*model)
    def forward(self, x): return self.model(x)

# ----------------- CONFIG -----------------
LEAFGAN_ROOT = "/content/LeafGAN_masked_final_dataset"   # must contain trainA, trainB, testA, testB
# ------------------------------------------


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMG_SIZE = 256
SAVE_GEN_ROOT = "/content/leafgan_results_2_synthetic/fakeB"
os.makedirs(SAVE_GEN_ROOT, exist_ok=True)

# load trained generator (pick the checkpoint)
ckpt = "/content/leafgan_checkpoints_no_masks2/content/leafgan_checkpoints_no_masks2/G_AB_epoch50.pth"  # change to your latest
G_AB = ResnetGenerator().to(DEVICE)
G_AB.load_state_dict(torch.load(ckpt, map_location=DEVICE))
G_AB.eval()

transform = transforms.Compose([
    transforms.Resize((IMG_SIZE,IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])

src_dir = os.path.join(LEAFGAN_ROOT, "trainA")  # or testA / extra healthy images
for fname in sorted(os.listdir(src_dir)):
    if not fname.lower().endswith((".png",".jpg",".jpeg")): continue
    img = Image.open(os.path.join(src_dir, fname)).convert("RGB")
    x = transform(img).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        fake = G_AB(x)
    fake_img = (fake[0].cpu()*0.5 + 0.5).clamp(0,1)
    utils.save_image(fake_img, os.path.join(SAVE_GEN_ROOT, fname))
print("Saved generated images to:", SAVE_GEN_ROOT)

Saved generated images to: /content/leafgan_results_2_synthetic/fakeB


In [None]:
# build_augmented.py
import os, shutil
LEAFGAN_ROOT = "/content/LeafGAN_masked_final_dataset"
src_real_diseased = os.path.join(LEAFGAN_ROOT, "trainB")
src_fake = "/content/leafgan_results_2_synthetic/content/leafgan_results_2_synthetic/fakeB"
dst_aug = "/content/Augmented_dataset_final/train/Diseased"
dst_healthy = "/content/Augmented_dataset_final/train/Healthy"
os.makedirs(dst_aug, exist_ok=True)
os.makedirs(dst_healthy, exist_ok=True)

# copy healthy (real) to Healthy
for f in os.listdir(os.path.join(LEAFGAN_ROOT, "trainA")):
    if f.lower().endswith((".png",".jpg")):
        shutil.copy(os.path.join(LEAFGAN_ROOT, "trainA", f), os.path.join(dst_healthy, f))

# copy real diseased
for f in os.listdir(src_real_diseased):
    if f.lower().endswith((".png",".jpg")):
        shutil.copy(os.path.join(src_real_diseased, f), os.path.join(dst_aug, f))

# copy synthetic diseased
for f in os.listdir(src_fake):
    if f.lower().endswith((".png",".jpg")):
        # rename if collision
        destf = f
        if os.path.exists(os.path.join(dst_aug, destf)):
            destf = f.rsplit(".",1)[0] + "_fake." + f.rsplit(".",1)[1]
        shutil.copy(os.path.join(src_fake, f), os.path.join(dst_aug, destf))

print("Augmented dataset ready at /content/Augmented_dataset_final/train")


Augmented dataset ready at /content/Augmented_dataset_final/train


In [None]:
# Build Aug+RealBalanced (balanced 1:1)
import os, shutil, random

random.seed(42)

LEAFGAN_ROOT = "/content/LeafGAN_masked_final_dataset"           # has trainA (healthy real), trainB (diseased real)
SRC_FAKE     = "/content/leafgan_results_2_synthetic/content/leafgan_results_2_synthetic/fakeB"      # GAN-generated diseased
DST          = "/content/Augmented_dataset_AugPlusRealBalanced/train"

# fresh folders
for cls in ["Healthy","Diseased"]:
    os.makedirs(os.path.join(DST, cls), exist_ok=True)

# --- collect healthy (real) ---
src_trainA = os.path.join(LEAFGAN_ROOT, "trainA")
healthy_files = [f for f in os.listdir(src_trainA) if f.lower().endswith((".jpg",".jpeg",".png"))]

# copy all real healthy
for f in healthy_files:
    shutil.copy(os.path.join(src_trainA, f), os.path.join(DST, "Healthy", f))

# --- build diseased pool: real + synthetic ---
src_trainB = os.path.join(LEAFGAN_ROOT, "trainB")
real_dis = [os.path.join(src_trainB, f) for f in os.listdir(src_trainB) if f.lower().endswith((".jpg",".jpeg",".png"))]
fake_dis = [os.path.join(SRC_FAKE, f)   for f in os.listdir(SRC_FAKE)   if f.lower().endswith((".jpg",".jpeg",".png"))]
pool = real_dis + fake_dis
random.shuffle(pool)

# --- cap diseased to match healthy count (balanced 1:1) ---
target = len(healthy_files)
pool = pool[:target]

# copy selected diseased; ensure unique filenames so we don't overwrite
existing = set()
for src in pool:
    f = os.path.basename(src)
    base, ext = os.path.splitext(f)
    dest = os.path.join(DST, "Diseased", f)
    i = 1
    while os.path.exists(dest) or dest in existing:
        dest = os.path.join(DST, "Diseased", f"{base}_mix{i}{ext}")
        i += 1
    shutil.copy(src, dest)
    existing.add(dest)

# --- verify counts ---
h = len([x for x in os.listdir(os.path.join(DST,"Healthy")) if x.lower().endswith((".jpg",".jpeg",".png"))])
d = len([x for x in os.listdir(os.path.join(DST,"Diseased")) if x.lower().endswith((".jpg",".jpeg",".png"))])
print("✅ Balanced dataset built at:", os.path.dirname(DST))
print("Healthy:", h, " Diseased:", d)


✅ Balanced dataset built at: /content/Augmented_dataset_AugPlusRealBalanced
Healthy: 1316  Diseased: 1316


In [None]:
# Build/confirm test set from the original LeafGAN dataset
LEAFGAN = "/content/LeafGAN_masked_final_dataset"

TEST_ROOT = "/content/Classifier_test"
import os, shutil
def reset(p):
    if os.path.exists(p): shutil.rmtree(p)
    os.makedirs(p, exist_ok=True)

reset(TEST_ROOT); os.makedirs(f"{TEST_ROOT}/Healthy", exist_ok=True); os.makedirs(f"{TEST_ROOT}/Diseased", exist_ok=True)

for f in os.listdir(f"{LEAFGAN}/testA"):
    if f.lower().endswith((".jpg",".png",".jpeg")): shutil.copy(f"{LEAFGAN}/testA/{f}", f"{TEST_ROOT}/Healthy/{f}")
for f in os.listdir(f"{LEAFGAN}/testB"):
    if f.lower().endswith((".jpg",".png",".jpeg")): shutil.copy(f"{LEAFGAN}/testB/{f}", f"{TEST_ROOT}/Diseased/{f}")

print("Test Healthy:", len(os.listdir(f"{TEST_ROOT}/Healthy")))
print("Test Diseased:", len(os.listdir(f"{TEST_ROOT}/Diseased")))


Test Healthy: 329
Test Diseased: 125
