In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.utils import save_image
import os
import numpy as np
from PIL import Image

import os


In [None]:
# Set Kaggle credentials
os.environ['KAGGLE_USERNAME'] = 'meghnabiswal'
os.environ['KAGGLE_KEY'] = 'e1c5a15db20d5c97b99f710660c88295'

In [2]:
class MRPNet(nn.Module):
    def __init__(self):
        super(MRPNet, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(inplace=True)
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 3, kernel_size=3, stride=1, padding=1)
        )
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [3]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=9, stride=1, padding=4),
            nn.ReLU(inplace=True)
        )
        self.block2 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.upsample = nn.Sequential(
            nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
            nn.PixelShuffle(2),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
            nn.PixelShuffle(2),
            nn.ReLU(inplace=True)
        )
        self.block3 = nn.Conv2d(64, 3, kernel_size=9, stride=1, padding=4)
    def forward(self, x):
        x = self.block1(x)
        residual = x
        x = self.block2(x)
        x = x + residual
        x = self.upsample(x)
        x = self.block3(x)
        return x

In [4]:
class CombinedModel(nn.Module):
    def __init__(self):
        super(CombinedModel, self).__init__()
        self.denoiser = MRPNet()
        self.super_resolver = Generator()
    def forward(self, x):
        x = self.denoiser(x)
        x = self.super_resolver(x)
        return x

In [5]:
class LowLightDataset(Dataset):
    def __init__(self, clean_dir, noisy_dir, transform=None):
        self.noisy_dir = noisy_dir
        self.transform = transform
        self.clean_dir = clean_dir
        self.image_list = os.listdir(noisy_dir)
    def __len__(self):
        return len(self.image_list)
    def __getitem__(self, idx):
        image_name = self.image_list[idx]
        name, img_no = image_name.split("_")
        
        noise_path = os.path.join(self.noisy_dir, self.image_list[idx])
        clean_path = os.path.join(self.clean_dir, "gt_"+img_no)
        
        noise_img = Image.open(noise_path).convert("RGB")
        clean_image = Image.open(clean_path).convert("RGB")
        
        if self.transform:
            clean_image = self.transform(clean_image)
            noise_img = self.transform(noise_img)
        return clean_image, noise_img

In [6]:
class TestLowLightDataset(Dataset):
    def __init__(self, image_dir, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_list = os.listdir(image_dir)
    def __len__(self):
        return len(self.image_list)
    def __getitem__(self, idx):
        img_path = os.path.join(self.image_dir, self.image_list[idx])
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.image_list[idx]

In [7]:
transform = transforms.Compose([
    transforms.ToTensor(),
])

In [8]:
train_clean = r'/kaggle/input/dlp-jan-2025-nppe-3/archive/train/gt'
train_noisy = r'/kaggle/input/dlp-jan-2025-nppe-3/archive/train/train'
val_clean = r'/kaggle/input/dlp-jan-2025-nppe-3/archive/val/gt'
val_noisy = r'/kaggle/input/dlp-jan-2025-nppe-3/archive/val/val'
test = r'/kaggle/input/dlp-jan-2025-nppe-3/archive/test'

In [9]:
train_dataset = LowLightDataset(train_clean,train_noisy, transform=transform)
val_dataset = LowLightDataset(val_clean,val_noisy, transform=transform)
test_dataset = TestLowLightDataset(test, transform = transform)

In [10]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size =16, shuffle = False, num_workers = 4)

In [11]:
def train_model(model, dataloader, optimizer, criterion, scaler, device):
    model.train()
    running_loss = 0.0
    for clean_images, noisy_images in dataloader:
        clean_images,noisy_images = clean_images.to(device), noisy_images.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(noisy_images)
            loss = criterion(outputs, clean_images)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    return running_loss / len(dataloader)

In [12]:
def validate_model(model, dataloader, criterion, device):
  model.eval()
  running_loss = 0.0
  with torch.no_grad():
    for clean_images, noisy_images in dataloader:
        clean_images = clean_images.to(device)
        noisy_images = noisy_images.to(device)
        outputs = model(noisy_images)
        loss = criterion(outputs, clean_images)
        running_loss += loss.item()
  return running_loss / len(dataloader)

In [13]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = CombinedModel()

model = nn.DataParallel(model)  
model = model.to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scaler = torch.cuda.amp.GradScaler()

  scaler = torch.cuda.amp.GradScaler()


In [None]:
#/kaggle/input/best_model/pytorch/default/1/best_model (2).pth

In [18]:
model.load_state_dict(torch.load('/kaggle/input/best_model/pytorch/default/1/best_model (2).pth'))

  model.load_state_dict(torch.load('/kaggle/input/best_model/pytorch/default/1/best_model (2).pth'))


<All keys matched successfully>

In [None]:
num_epochs = 50
best_val_loss = float("inf")

for epoch in range(num_epochs):

  train_loss = train_model(model, train_loader, optimizer, criterion, scaler, device)
  val_loss = validate_model(model, val_loader, criterion, device)

  if val_loss < best_val_loss:
      best_val_loss = val_loss
      torch.save(model.state_dict(), "best_model.pth")
  print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.9f}, Val Loss: {val_loss:.9f}")

  with torch.cuda.amp.autocast():


Epoch 1/5, Train Loss: 0.000313646, Val Loss: 0.000147132
Epoch 2/5, Train Loss: 0.000234350, Val Loss: 0.000144496
Epoch 3/5, Train Loss: 0.000234378, Val Loss: 0.000144406
Epoch 4/5, Train Loss: 0.000233270, Val Loss: 0.000146028


In [None]:
os.makedirs('/kaggle/working/test_final',exist_ok=True)
final_path = "/kaggle/working/test_final"

In [None]:
def test_prediction(model, test_loader, device, path):
    model.eval()
    with torch.no_grad():
        with tqdm(total = len(test_loader), desc = 'Testing', unit = 'batch') as tepoch:
            for noise_imgs, img_name in test_loader:
                noise_imgs = noise_imgs.to(device)

                outputs = model(noise_imgs)

                for idx in range(outputs.shape[0]):
                    predicted_img = outputs[idx].squeeze(0).cpu()
                    
                    out_path = os.path.join(path, f"{img_name[idx]}")
                    save_image(predicted_img, out_path)

                tepoch.update(1)

In [None]:
from tqdm import tqdm

In [None]:
model.load_state_dict(torch.load("/kaggle/working/best_model.pth"))

In [None]:
test_prediction(model,test_loader,device, final_path)

In [None]:
import os
import numpy as np
import pandas as pd
from PIL import Image

def images_to_csv(folder_path, output_csv):
    data_rows = []
    for filename in os.listdir(folder_path):
        if filename.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff')):
            image_path = os.path.join(folder_path, filename)
            image = Image.open(image_path).convert('L') 
            image_array = np.array(image).flatten()[::8]
            image_id = filename.split('.')[0].replace('test_', 'gt_')
            data_rows.append([image_id, *image_array])
    column_names = ['ID'] + [f'pixel_{i}' for i in range(len(data_rows[0]) - 1)]
    df = pd.DataFrame(data_rows, columns=column_names)
    df.to_csv(output_csv, index=False)
    print(f'Successfully saved to {output_csv}')

folder_path = final_path
output_csv = 'submissionfinal.csv'
images_to_csv(folder_path, output_csv)

from kaggle.api.kaggle_api_extended import KaggleApi

# Authenticate Kaggle API
api = KaggleApi()
api.authenticate()

# Submit the file
api.competition_submit(
    file_name='submissionfinal.csv',  
    message="Auto submission Done Meghna",
    competition="dlp-jan-2025-nppe-3"
)

print("Submission complete Meghna!")