In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader,random_split
from torchvision.transforms import functional as TF
from torchmetrics.functional import structural_similarity_index_measure as ssim
from torchmetrics.functional import peak_signal_noise_ratio as psnr
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from PIL import Image
import torchvision.transforms as transforms

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
labels_idx = {'daisy':0,'dandelion':1,'roses':2,'sunflowers':3,'tulips':4}

In [None]:
import os
from torch.utils.data import Dataset
from PIL import Image

class FlowerDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.clean_dir = os.path.join(root_dir, "clean")
        self.noisy_dir = os.path.join(root_dir, "noisy")
        self.classes = sorted(os.listdir(self.clean_dir))
        self.transform = transform

        self.image_pairs = []
        for cls in self.classes:
            clean_path = os.path.join(self.clean_dir, cls)
            noisy_path = os.path.join(self.noisy_dir, cls)

            clean_images = set(os.listdir(clean_path))
            noisy_images = set(os.listdir(noisy_path))

            common_files = sorted(list(clean_images.intersection(noisy_images)))

            for fname in common_files:
                self.image_pairs.append((
                    os.path.join(clean_path, fname),
                    os.path.join(noisy_path, fname),
                    labels_idx[cls]
                ))

        print(f"Loaded {len(self.image_pairs)} pairs across {len(self.classes)} classes.")

    def __len__(self):
        return len(self.image_pairs)

    def __getitem__(self, idx):
        clean_path, noisy_path, label = self.image_pairs[idx]

        clean_img = Image.open(clean_path).convert("RGB")
        noisy_img = Image.open(noisy_path).convert("RGB")

        if self.transform:
            clean_img = self.transform(clean_img)
            noisy_img = self.transform(noisy_img)

        return noisy_img, clean_img, noisy_path, clean_path, label


In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        return self.net(x)

class Down(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_ch, out_ch)
        )
    def forward(self, x):
        return self.net(x)

class Up(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        self.conv = DoubleConv(in_ch, out_ch)
    def forward(self, x1, x2):
        x1 = self.up(x1)
        x = torch.cat([x2, x1], dim=1)  # no padding since sizes match
        return self.conv(x)
        
class UNetAutoencoder(nn.Module):
    def __init__(self, in_ch=3, out_ch=3, base=32):
        super().__init__()
        self.inc   = DoubleConv(in_ch, base)
        self.down1 = Down(base, base*2)
        self.down2 = Down(base*2, base*4)
        self.up1   = Up(base*4 + base*2, base*2)
        self.up2   = Up(base*2 + base, base)
        self.outc  = nn.Conv2d(base, out_ch, 1)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x  = self.up1(x3, x2)
        x  = self.up2(x, x1)
        return self.outc(x)

In [None]:
class StrongCNN(nn.Module):
    def __init__(self, num_classes=5, dropout=0.5):
        def GN(c): 
            return nn.GroupNorm(num_groups=min(32, c // 2 if c >= 2 else 1), num_channels=c)

        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), GN(32), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), GN(64), nn.ReLU(),
            nn.Conv2d(64, 64, 3, padding=1), GN(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), GN(128), nn.ReLU(),
            nn.Conv2d(128, 128, 3, padding=1), GN(128), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1), GN(256), nn.ReLU(),
            nn.Conv2d(256, 256, 3, padding=1), GN(256), nn.ReLU(), nn.MaxPool2d(2),
            )

        self.gap = nn.AdaptiveAvgPool2d((1,1))
        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(256, 128), nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.gap(x)
        x = torch.flatten(x, 1)
        return self.classifier(x)

In [None]:
def denormalize(tensor, mean, std):
    mean = torch.tensor(mean).view(1, -1, 1, 1).to(tensor.device)
    std  = torch.tensor(std).view(1, -1, 1, 1).to(tensor.device)
    img  = tensor * std + mean
    return torch.clamp(img, 0.0, 1.0)

In [None]:
mean = [0.485, 0.456, 0.406]   # example: ImageNet 
std  = [0.229, 0.224, 0.225] 

In [None]:
def find_psnr_ssim(pred, target):
   
    # Denormalize to [0,1]
    pred = denormalize(pred, mean, std)
    target = denormalize(target, mean, std)
    
    ssim_val = ssim(pred, target, data_range=1.0)

    psnr_val = psnr(pred, target, data_range=1.0)

    return psnr_val.item(), ssim_val.item()

In [None]:
def show_noisy_denoised_clean(loader, n=6):
    denoised, labels, noisy_paths, clean_paths = next(iter(loader))

    simple_transform = transforms.Compose([
        transforms.Resize((128, 128)),
        transforms.ToTensor()
    ])

    fig, axes = plt.subplots(3, n, figsize=(3 * n, 9))

    for i in range(n):
        noisy_img = Image.open(noisy_paths[i]).convert("RGB")
        noisy_img = simple_transform(noisy_img).permute(1, 2, 0).numpy()
        axes[0, i].imshow(noisy_img)
        axes[0, i].set_title("Original Noisy")
        axes[0, i].axis("off")

        den_img = denormalize(denoised[i].unsqueeze(0), mean, std)
        den_img = den_img.squeeze(0).permute(1, 2, 0).cpu().numpy()
        axes[1, i].imshow(den_img)
        axes[1, i].set_title("Denoised")
        axes[1, i].axis("off")

        clean_img = Image.open(clean_paths[i]).convert("RGB")
        clean_img = simple_transform(clean_img).permute(1, 2, 0).numpy()
        axes[2, i].imshow(clean_img)
        axes[2, i].set_title("Original Clean")
        axes[2, i].axis("off")

    plt.tight_layout()
    plt.show()


In [None]:
def train_denoiser(model, train_loader, val_loader,epochs, alpha=0.8, beta=0.2):

    l1_loss = nn.L1Loss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=5)

    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(epochs):
        model.train()
        total_train_loss, train_psnr, train_ssim = 0, [], []

        for noisy, clean, *_ in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
            noisy, clean = noisy.to(device), clean.to(device)
            optimizer.zero_grad()

            with torch.cuda.amp.autocast():
                denoised = model(noisy)

                l1 = l1_loss(denoised, clean)

                denoised_dn = denormalize(denoised, mean, std)
                clean_dn    = denormalize(clean, mean, std)
                ssim_val = ssim(denoised_dn, clean_dn, data_range=1.0)
                ssim_l = 1 - ssim_val

                loss = alpha * l1 + beta * ssim_l
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            total_train_loss += loss.item()

            psnr_val, ssim_val_eval = find_psnr_ssim(clean, denoised)
            train_psnr.append(psnr_val)
            train_ssim.append(ssim_val_eval)

        avg_train_loss = total_train_loss / len(train_loader)
        avg_train_psnr = sum(train_psnr) / len(train_psnr)
        avg_train_ssim = sum(train_ssim) / len(train_ssim)

        model.eval()
        total_val_loss, val_psnr, val_ssim = 0, [], []
        with torch.no_grad():
            for noisy, clean, *_ in tqdm(val_loader, desc="Validation"):
                noisy, clean = noisy.to(device), clean.to(device)

                with torch.cuda.amp.autocast():
                    denoised = model(noisy)

                    l1 = l1_loss(denoised, clean)
                    denoised_dn = denormalize(denoised, mean, std)
                    clean_dn    = denormalize(clean, mean, std)
                    ssim_val = ssim(denoised_dn, clean_dn, data_range=1.0)
                    ssim_l = 1 - ssim_val
                    val_loss = alpha * l1 + beta * ssim_l

                total_val_loss += val_loss.item()
                
                psnr_val, ssim_val_eval = find_psnr_ssim(clean, denoised)
                val_psnr.append(psnr_val)
                val_ssim.append(ssim_val_eval)

        avg_val_loss = total_val_loss / len(val_loader)
        avg_val_psnr = sum(val_psnr) / len(val_psnr)
        avg_val_ssim = sum(val_ssim) / len(val_ssim)

        scheduler.step(avg_val_loss)

        print(f"[Epoch {epoch+1}/{epochs}] "
              f"Train Loss: {avg_train_loss:.4f} | "
              f"Train PSNR: {avg_train_psnr:.3f} | Train SSIM: {avg_train_ssim:.3f} "
              f"|| Val Loss: {avg_val_loss:.4f} | "
              f"Val PSNR: {avg_val_psnr:.3f} | Val SSIM: {avg_val_ssim:.3f}")

In [None]:
def train_classifier(classifier, train_loader, val_loader, optimizer, epochs=200):
    loss_fn = nn.CrossEntropyLoss()

    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode="min", factor=0.5, patience=3, verbose=True, min_lr=1e-6
    )

    for epoch in range(epochs):
        # Training
        classifier.train()
        total_loss, correct, total = 0, 0, 0
        for imgs,labels,*_ in tqdm(train_loader, desc=f"Training Epoch {epoch+1}"):
            imgs,labels=imgs.to(device),labels.to(device)
            optimizer.zero_grad()
            preds = classifier(imgs)
            loss = loss_fn(preds, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            correct += (preds.argmax(1) == labels).sum().item()
            total += labels.size(0)

        train_loss = total_loss / len(train_loader)
        train_acc = correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        # Validation
        classifier.eval()
        val_loss, val_correct, val_total = 0, 0, 0
        with torch.no_grad():
            for imgs,labels,*_ in val_loader:
                imgs,labels  = imgs.to(device),labels.to(device)
                preds = classifier(imgs)
                loss = loss_fn(preds, labels)

                val_loss += loss.item()
                val_correct += (preds.argmax(1) == labels).sum().item()
                val_total += labels.size(0)

        val_loss /= len(val_loader)
        val_acc = val_correct / val_total
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        scheduler.step(val_loss)

        print(f"[Epoch {epoch+1}/{epochs}] "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f} | "
              f"LR: {optimizer.param_groups[0]['lr']:.6f}")

    return train_losses, val_losses, train_accuracies, val_accuracies


In [None]:
class DenoisedDataset(Dataset):
    def __init__(self, denoiser, base_loader):
        self.samples = []
        self.denoiser = denoiser.eval()  # eval mode

        device = next(denoiser.parameters()).device

        with torch.no_grad():
            for noisy, clean, noisy_path, clean_path, labels in base_loader:
                noisy = noisy.to(device)
                denoised = denoiser(noisy).cpu()   # [B, C, H, W]

                for d, l, np, cp in zip(denoised, labels, noisy_path, clean_path):
                    self.samples.append((d, l, np, cp))  
                    # (denoised tensor, label, noisy path, clean path)

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]  # (denoised, label, noisy_path, clean_path)


In [None]:
train_transforms = transforms.Compose([
    transforms.Resize((128, 128)),
    #transforms.RandomHorizontalFlip(),
    #transforms.RandomRotation(10),
    #transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                         ]) #

val_transforms = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
train_set = "/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/train"
test_set  = "/kaggle/input/180-dc-ml-sig-recruitment/REC_DATASET/test/noisy"

dataset = FlowerDataset(train_set, transform=train_transforms)
val_split = 0.2
val_size = int(len(dataset)*val_split)
train_size = len(dataset)-val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [None]:
baseline_psnr, baseline_ssim = 0, 0
for noisy, clean, *_ in train_loader:
    b_psnr, b_ssim = find_psnr_ssim(noisy, clean)
    baseline_psnr += b_psnr
    baseline_ssim += b_ssim

baseline_psnr /= len(train_loader)
baseline_ssim /= len(train_loader)

print(f"Baseline PSNR (noisy vs clean): {baseline_psnr:.2f}")
print(f"Baseline SSIM (noisy vs clean): {baseline_ssim:.4f}")

In [None]:
for noisy, clean, noisy_paths, clean_paths,label in train_loader:
    print("Noisy batch shape:", noisy.shape)
    print("Clean batch shape:", clean.shape)
    print("First noisy path:", noisy_paths[0])
    print("First clean path:", clean_paths[0])
    print("First label: ", label[0])
    break

In [None]:
denoiser = UNetAutoencoder().to(device)
train_denoiser(denoiser, train_loader, val_loader,epochs=75)
torch.save(denoiser.state_dict(), "denoiser.pth")

In [None]:
denoiser.load_state_dict(torch.load("denoiser.pth"))
denoised_train = DenoisedDataset(denoiser, train_loader)
denoised_val   = DenoisedDataset(denoiser, val_loader)
train_loader_d = DataLoader(denoised_train, batch_size=32, shuffle=True)
val_loader_d   = DataLoader(denoised_val, batch_size=32, shuffle=False)

In [None]:
show_noisy_denoised_clean(train_loader_d)

In [None]:
classifier = StrongCNN().to(device)
optimizer_c = optim.Adam(classifier.parameters(), lr=5e-5,weight_decay=1e-4)
train_loss,val_loss,train_acc,val_acc = train_classifier(classifier, train_loader_d, val_loader_d, optimizer_c)
torch.save(classifier.state_dict(), "classifier.pth")

In [None]:
import pandas as pd

def generate_submission(denoiser, classifier, test_set, out_file="submission.csv"):
    class TestDataset(Dataset):
        def __init__(self, root_dir, transform=None):
            self.noisy_dir = root_dir
            self.images = sorted(os.listdir(root_dir))
            self.transform = transform

        def __len__(self): return len(self.images)

        def __getitem__(self, idx):
            path = os.path.join(self.noisy_dir, self.images[idx])
            img = Image.open(path).convert("RGB")
            if self.transform: img = self.transform(img)
            return img, self.images[idx]

    test_dataset = TestDataset(test_set, transform=val_transforms)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    denoiser.eval(); classifier.eval()
    predictions, filenames = [], []

    with torch.no_grad():
        for imgs, names in test_loader:
            imgs = imgs.to(device)
            denoised = denoiser(imgs)
            preds = classifier(denoised)
            preds = preds.argmax(1).cpu().numpy()  # 0–4
            preds = preds + 1  # shift to 1–5
            predictions.extend(preds)
            filenames.extend(names)

    df = pd.DataFrame({"Images": filenames, "Predicted_Classes": predictions})
    df.to_csv(out_file, index=False)
    print(f"Saved submission to {out_file}")


In [None]:
from torchvision.utils import save_image

def save_denoised_images(denoiser, test_set, output_dir="denoised_outputs"):
    os.makedirs(output_dir, exist_ok=True)

    # Test dataset 
    class TestDataset(Dataset):
        def __init__(self, root_dir, transform=None):
            self.noisy_dir = root_dir
            self.images = sorted(os.listdir(root_dir))
            self.transform = transform

        def __len__(self): return len(self.images)

        def __getitem__(self, idx):
            path = os.path.join(self.noisy_dir, self.images[idx])
            img = Image.open(path).convert("RGB")
            if val_transforms: img = val_transforms(img)
            return img, self.images[idx]

    test_dataset = TestDataset(test_set, transform=val_transforms)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    denoiser.eval()
    with torch.no_grad():
        for imgs, names in test_loader:
            imgs = imgs.to(device)
            denoised = denoiser(imgs).cpu()
            for img, name in zip(denoised, names):
                save_path = os.path.join(output_dir, os.path.splitext(name)[0] + ".png")
                save_image(img, save_path)

    print(f"Denoised images saved to: {output_dir}")

In [None]:
save_denoised_images(denoiser, test_set, output_dir="denoised_test_images")

generate_submission(denoiser, classifier, test_set, out_file="submission.csv")
