In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

## Installing the Required Imports 

In [None]:
import os
import glob
from PIL import Image
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T
import torch
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
import random
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torchvision.models import vgg19
import torch.nn.functional as F

## Data Augmentation and Preprocessing for Super-Resolution

In [None]:
def random_cropping(lr_img, hr_img, crop_size_hr=128, scale=4):
    """Performs a random crop on the high-resolution image and corresponding low-resolution image."""
    hr_width, hr_height = hr_img.size
    x = random.randint(0, hr_width - crop_size_hr)
    y = random.randint(0, hr_height - crop_size_hr)

    hr_crop = F.crop(hr_img, y, x, crop_size_hr, crop_size_hr)
    lr_crop = F.crop(
        lr_img,
        y // scale,
        x // scale,
        crop_size_hr // scale,
        crop_size_hr // scale
    )
    return lr_crop, hr_crop

class SRTransform:
    """Applies data augmentation for Super Resolution tasks including cropping, flipping, rotating, and color jitter."""
    def __init__(self, crop_size_hr=128, scale=4):
        self.crop_size_hr = crop_size_hr
        self.scale = scale
        self.color_jitter = T.ColorJitter(
            brightness=0.1,
            contrast=0.1,
            saturation=0.1,
            hue=0.05
        )

    def augment(self, lr_img, hr_img):
        """Applies random horizontal flip, vertical flip, and rotation."""
        if random.random() > 0.5:
            lr_img, hr_img = F.hflip(lr_img), F.hflip(hr_img)
        if random.random() > 0.5:
            lr_img, hr_img = F.vflip(lr_img), F.vflip(hr_img)
        if random.random() > 0.5:
            angle = random.choice([90, 180, 270])
            lr_img, hr_img = F.rotate(lr_img, angle), F.rotate(hr_img, angle)
        return lr_img, hr_img

    def __call__(self, lr_img, hr_img):
        lr_crop, hr_crop = random_cropping(lr_img, hr_img, self.crop_size_hr, self.scale)
        lr_aug, hr_aug = self.augment(lr_crop, hr_crop)

        lr_aug = self.color_jitter(lr_aug)
        return F.to_tensor(lr_aug), F.to_tensor(hr_aug)


In [None]:
class LowLightSRDataset(Dataset):
    def __init__(self, lr_dir, hr_dir=None, transform_lr=None, transform_hr=None):
       
        self.lr_dir = lr_dir
        self.hr_dir = hr_dir
        self.lr_files = sorted(glob.glob(os.path.join(lr_dir, "*.png")))
        
       
        if hr_dir is not None:
            self.hr_files = sorted(glob.glob(os.path.join(hr_dir, "*.png")))
        else:
            self.hr_files = None
        
        self.transform_lr = transform_lr
        self.transform_hr = transform_hr

    def __len__(self):
        return len(self.lr_files)

    def __getitem__(self, idx):
        lr_path = self.lr_files[idx]
        lr_img = Image.open(lr_path).convert("RGB")
        
        if self.transform_lr:
            lr_img = self.transform_lr(lr_img)

        if self.hr_files is not None:
            hr_path = self.hr_files[idx]
            hr_img = Image.open(hr_path).convert("RGB")
            if self.transform_hr:
                hr_img = self.transform_hr(hr_img)
            return lr_img, hr_img
        else:
            
            return lr_img


## Model Architecture


In [None]:
class ResidualBlock(nn.Module):
    """A basic residual block with two 3x3 convolutional layers and a ReLU activation."""
    def __init__(self, channels: int):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        )

    def forward(self, x):
        return x + self.block(x)

class DenoiseSRNet(nn.Module):
    """
    A Super-Resolution network with denoising capabilities using residual learning and pixel shuffle upsampling.
    """
    def __init__(self, in_channels=3, out_channels=3, num_features=64, num_res_blocks=12, upscale=4):
        super().__init__()
        
       
        self.entry = nn.Conv2d(in_channels, num_features, kernel_size=3, padding=1)
        
     
        self.residual_layers = nn.Sequential(
            *[ResidualBlock(num_features) for _ in range(num_res_blocks)]
        )
        
        # Upsampling
        self.upsample = nn.Sequential(
            nn.Conv2d(num_features, num_features * (upscale ** 2), kernel_size=3, padding=1),
            nn.PixelShuffle(upscale),
            nn.ReLU(inplace=True)
        )
        
        # Final output layer
        self.output = nn.Conv2d(num_features, out_channels, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.entry(x)
        x = self.residual_layers(x)
        x = self.upsample(x)
        return self.output(x)


## Custom Loss Functions

In [None]:
class CharbonnierLoss(nn.Module):
   
    def __init__(self, eps=1e-6):
        super().__init__()
        self.eps = eps

    def forward(self, x, y):
        return torch.mean(torch.sqrt((x - y) ** 2 + self.eps))



class PerceptualLoss(nn.Module):
   
    def __init__(self):
        super().__init__()
        vgg_features = vgg19(pretrained=True).features[:16].eval()
        for param in vgg_features.parameters():
            param.requires_grad = False
        self.vgg = vgg_features
        self.criterion = nn.MSELoss()

    def forward(self, sr, hr):
        sr_features = self.vgg(sr)
        hr_features = self.vgg(hr)
        return self.criterion(sr_features, hr_features)

In [None]:
def train_epoch(model, dataloader, optimizer, device, loss_fn):
    model.train()
    running_loss = 0.0

    for lr_imgs, hr_imgs in dataloader:
        lr_imgs, hr_imgs = lr_imgs.to(device), hr_imgs.to(device)

        optimizer.zero_grad()
        preds = model(lr_imgs)
        loss = loss_fn(preds, hr_imgs)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * lr_imgs.size(0)

    return running_loss / len(dataloader.dataset)


def evaluate_psnr(model, dataloader, device):
    model.eval()
    total_psnr = 0.0

    with torch.no_grad():
        for lr_imgs, hr_imgs in dataloader:
            lr_imgs, hr_imgs = lr_imgs.to(device), hr_imgs.to(device)
            preds = model(lr_imgs)

            mse = F.mse_loss(preds, hr_imgs, reduction='none').mean(dim=[1, 2, 3])
            psnr = 10 * torch.log10(1 / (mse + 1e-8))
            total_psnr += psnr.sum().item()

    return total_psnr / len(dataloader.dataset)


## Training Pipeline

In [None]:
def train_model(num_epochs=50, batch_size=4, lr=1e-3, 
                train_dir='/kaggle/input/dlp-jan-2025-nppe-3/archive/train',
                val_dir='/kaggle/input/dlp-jan-2025-nppe-3/archive/val'):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Dataset setup
    train_dataset = LowLightSRDataset(
        lr_dir=f"{train_dir}/train",
        hr_dir=f"{train_dir}/gt",
        transform_lr=T.ToTensor(),
        transform_hr=T.ToTensor()
    )
    val_dataset = LowLightSRDataset(
        lr_dir=f"{val_dir}/val",
        hr_dir=f"{val_dir}/gt",
        transform_lr=T.ToTensor(),
        transform_hr=T.ToTensor()
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

    # Model, optimizer, scheduler
    model = DenoiseSRNet().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = StepLR(optimizer, step_size=10, gamma=0.5)

    # Loss Function
    perceptual_loss = PerceptualLoss().to(device)
    charbonnier_loss = CharbonnierLoss()
    def loss_fn(pred, target):
        return charbonnier_loss(pred, target) + 0.01 * perceptual_loss(pred, target)

    # Training loop
    best_psnr = 0.0
    psnr_history = []

    for epoch in range(num_epochs):
        scheduler.step()

        train_loss = train_epoch(model, train_loader, optimizer, device, loss_fn)
        val_psnr = evaluate_psnr(model, val_loader, device)
        psnr_history.append(val_psnr)

        print(f"[Epoch {epoch+1}/{num_epochs}] Train Loss: {train_loss:.4f} | Val PSNR: {val_psnr:.4f} dB")

        if val_psnr > best_psnr:
            best_psnr = val_psnr
            torch.save(model.state_dict(), "best_model.pth")

    print(f"\nTraining complete. Best Validation PSNR: {best_psnr:.4f} dB")

    return model

## Training the Model 

In [None]:
#training the model 
model = train_model()


## Predicting on Test Dataset 

In [None]:

def inference(model_path, test_dir, output_dir, device="cuda"):
    os.makedirs(output_dir, exist_ok=True)
    
    # Load model
    model = DenoiseSRNet()
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    
    # List test images
    test_files = sorted(glob.glob(os.path.join(test_dir, "*.png")))
    
    with torch.no_grad():
        for img_path in test_files:
            filename = os.path.basename(img_path)
            lr_img = Image.open(img_path).convert("RGB")
            lr_tensor = F.to_tensor(lr_img).unsqueeze(0).to(device)
            
            sr_tensor = model(lr_tensor)
            sr_img = sr_tensor.squeeze(0).cpu().clamp(0,1)
            
            # Convert to PIL
            sr_img_pil = T.ToPILImage()(sr_img)
            sr_img_pil.save(os.path.join(output_dir, filename))
    
    print(f"predictions complete. Results saved in {output_dir}")


In [None]:
#generating inf 
inference("/kaggle/working/best_model.pth","/kaggle/input/dlp-jan-2025-nppe-3/archive/test","results",device="cuda")

## Generating Submission CSV

In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image

def images_to_csv(folder_path, output_csv):
    data_rows = []
    for filename in os.listdir(folder_path):
        if filename.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            image_path = os.path.join(folder_path, filename)
            image = Image.open(image_path).convert('L') 
            image_array = np.array(image).flatten()[::8]   
            image_id = filename.split('.')[0].replace('test_', 'gt_') # Replace test with gt
            data_rows.append([image_id, *image_array])
    column_names = ['ID'] + [f'pixel_{i}' for i in range(len(data_rows[0]) - 1)]
    df = pd.DataFrame(data_rows, columns=column_names)
    df.to_csv(output_csv, index=False)
    print(f'Successfully saved to {output_csv}')
    
    
folder_path = '/kaggle/working/results'
output_csv = 'submission.csv'
images_to_csv(folder_path, output_csv)