In [1]:
# # This Python 3 environment comes with many helpful analytics libraries installed
# # It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# # For example, here's several helpful packages to load

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# # Input data files are available in the read-only "../input/" directory
# # For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# # You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# # You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
import torch
import torch.nn as nn

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        
    def forward(self, x):
        residual = x
        out = self.relu(self.conv1(x))
        out = self.conv2(out)
        out += residual  # Skip connection
        return out

class LightweightSR(nn.Module):
    def __init__(self, num_blocks=8, feature_channels=64, upscale_factor=4):
        super(LightweightSR, self).__init__()
        
        # Initial feature extraction
        self.initial = nn.Sequential(
            nn.Conv2d(3, feature_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )
        
        # Residual blocks
        self.residual_blocks = nn.Sequential(
            *[ResidualBlock(feature_channels) for _ in range(num_blocks)]
        )
        
        # Global skip connection conv
        self.conv_mid = nn.Conv2d(feature_channels, feature_channels, kernel_size=3, padding=1)
        
        # Upsampling - use pixel shuffle for efficiency
        if upscale_factor == 4:
            self.upscale = nn.Sequential(
                nn.Conv2d(feature_channels, feature_channels*4, kernel_size=3, padding=1),
                nn.PixelShuffle(2),
                nn.Conv2d(feature_channels, feature_channels*4, kernel_size=3, padding=1),
                nn.PixelShuffle(2)
            )
        else:  # Assuming upscale_factor == 2
            self.upscale = nn.Sequential(
                nn.Conv2d(feature_channels, feature_channels*4, kernel_size=3, padding=1),
                nn.PixelShuffle(2)
            )
        
        # Final reconstruction
        self.final = nn.Conv2d(feature_channels, 3, kernel_size=3, padding=1)
        
    def forward(self, x):
        initial = self.initial(x)
        
        out = self.residual_blocks(initial)
        out = self.conv_mid(out)
        out = out + initial  # Global residual learning
        
        out = self.upscale(out)
        out = self.final(out)
        
        return out

In [3]:
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class SRDataset(Dataset):
    def __init__(self, lr_paths, hr_paths=None, patch_size=96, is_train=True):  # Increased from 64 to 96
        self.lr_paths = lr_paths
        self.hr_paths = hr_paths
        self.patch_size = patch_size
        self.is_train = is_train
        
    def __len__(self):
        return len(self.lr_paths)
    
    def __getitem__(self, idx):
        # Load LR image
        lr_img = cv2.imread(self.lr_paths[idx])
        lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB)
        
        if self.hr_paths:
            # Load HR image
            hr_img = cv2.imread(self.hr_paths[idx])
            hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB)
            
            # Random crop for training
            if self.is_train:
                h, w = lr_img.shape[:2]
                
                # Ensure we have enough space for a crop
                if h < self.patch_size or w < self.patch_size:
                    # If image is too small, resize it to be at least patch_size
                    scale_factor = max(self.patch_size / h, self.patch_size / w) * 1.1
                    new_h, new_w = int(h * scale_factor), int(w * scale_factor)
                    lr_img = cv2.resize(lr_img, (new_w, new_h))
                    
                    # Resize HR accordingly
                    scale = hr_img.shape[0] // h  # Scale between HR and LR
                    hr_img = cv2.resize(hr_img, (new_w * scale, new_h * scale))
                    
                    h, w = lr_img.shape[:2]  # Update dimensions
                
                # Random crop
                x = np.random.randint(0, w - self.patch_size)
                y = np.random.randint(0, h - self.patch_size)
                
                # Adjust for HR image
                scale = hr_img.shape[0] // lr_img.shape[0]
                lr_img = lr_img[y:y+self.patch_size, x:x+self.patch_size]
                hr_img = hr_img[y*scale:(y+self.patch_size)*scale, x*scale:(x+self.patch_size)*scale]
                
                # Random flip
                if np.random.random() > 0.5:
                    lr_img = np.flip(lr_img, axis=1)
                    hr_img = np.flip(hr_img, axis=1)
        
        # Convert to tensor and normalize to [-1, 1]
        lr_tensor = torch.from_numpy(lr_img.transpose(2, 0, 1).astype(np.float32)) / 127.5 - 1.0
        
        if self.hr_paths:
            hr_tensor = torch.from_numpy(hr_img.transpose(2, 0, 1).astype(np.float32)) / 127.5 - 1.0
            return lr_tensor, hr_tensor
        else:
            return lr_tensor

In [4]:
import torch.cuda.amp as amp
from tqdm.notebook import tqdm

def train_model(model, train_loader, val_loader, epochs=50, lr=1e-4):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    
    criterion = nn.L1Loss()  # L1 is better for image SR
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    # Cosine annealing scheduler for faster convergence
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
    
    # Mixed precision for efficiency
    scaler = amp.GradScaler()
    
    best_psnr = 0
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        
        with tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}") as pbar:
            for lr_imgs, hr_imgs in pbar:
                lr_imgs = lr_imgs.to(device)
                hr_imgs = hr_imgs.to(device)
                
                optimizer.zero_grad()
                
                # Mixed precision training - fixed syntax
                with torch.amp.autocast('cuda'):
                    sr_imgs = model(lr_imgs)
                    loss = criterion(sr_imgs, hr_imgs)
                
                # Scale gradients and optimize
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
                
                epoch_loss += loss.item()
                pbar.set_postfix(loss=f"{epoch_loss/len(pbar):.4f}")
        
        scheduler.step()
        
        # Validate
        model.eval()
        val_psnr = evaluate_psnr(model, val_loader, device)
        print(f"Validation PSNR: {val_psnr:.2f} dB")
        
        # Save if best
        if val_psnr > best_psnr:
            best_psnr = val_psnr
            torch.save(model.state_dict(), "best_sr_model_1.pth")
            
    return model

In [5]:
from skimage.metrics import peak_signal_noise_ratio, structural_similarity

def evaluate_psnr(model, val_loader, device):
    model.eval()
    psnr_values = []
    ssim_values = []
    
    with torch.no_grad():
        for lr_imgs, hr_imgs in val_loader:
            lr_imgs = lr_imgs.to(device)
            hr_imgs = hr_imgs.cpu().numpy().transpose(0, 2, 3, 1)
            
            # Denormalize images from [-1, 1] to [0, 255]
            hr_imgs = (hr_imgs + 1) * 127.5
            
            sr_imgs = model(lr_imgs).cpu().numpy().transpose(0, 2, 3, 1)
            sr_imgs = np.clip((sr_imgs + 1) * 127.5, 0, 255)
            
            # Calculate metrics
            for i in range(lr_imgs.size(0)):
                psnr_values.append(peak_signal_noise_ratio(hr_imgs[i], sr_imgs[i], data_range=255))
                # Fix the SSIM call - specify channel_axis instead of multichannel
                ssim_values.append(structural_similarity(
                    hr_imgs[i], 
                    sr_imgs[i], 
                    channel_axis=2,  # Specify that channels are on axis 2
                    data_range=255,
                    win_size=7  # Explicitly set window size to be smaller
                ))
                
    avg_psnr = np.mean(psnr_values)
    avg_ssim = np.mean(ssim_values)
    
    # Calculate joint metric (as per competition rules)
    joint_metric = 40 * avg_ssim + avg_psnr
    
    return joint_metric

In [6]:
import os
import glob
from sklearn.model_selection import train_test_split

# Path setup
train_lr_dir = "/kaggle/input/gnr638/train-kaggle/train-kaggle/lr"
train_hr_dir = "/kaggle/input/gnr638/train-kaggle/train-kaggle/hr"  # If available

# Get file lists
lr_files = sorted(glob.glob(os.path.join(train_lr_dir, "*.png")))
hr_files = sorted(glob.glob(os.path.join(train_hr_dir, "*.png"))) if os.path.exists(train_hr_dir) else None

# Create train/val split
train_lr, val_lr = train_test_split(lr_files, test_size=0.1, random_state=42)
train_hr, val_hr = train_test_split(hr_files, test_size=0.1, random_state=42) if hr_files else (None, None)

# Create datasets with larger patch size
train_dataset = SRDataset(train_lr, train_hr, patch_size=96, is_train=True)
val_dataset = SRDataset(val_lr, val_hr, is_train=False)

# Create data loaders - small batch size to save memory
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=4)

# Create model
model = LightweightSR(num_blocks=8, feature_channels=64)

# Train model
train_model(model, train_loader, val_loader, epochs=10, lr=1e-4)

  scaler = amp.GradScaler()


Epoch 1/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 49.59 dB


Epoch 2/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 52.72 dB


Epoch 3/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 52.80 dB


Epoch 4/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 52.48 dB


Epoch 5/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 53.51 dB


Epoch 6/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 53.76 dB


Epoch 7/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 53.83 dB


Epoch 8/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 54.15 dB


Epoch 9/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 53.99 dB


Epoch 10/10:   0%|          | 0/254 [00:00<?, ?it/s]

Validation PSNR: 54.17 dB


LightweightSR(
  (initial): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
  )
  (residual_blocks): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    )
    (1): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    )
    (2): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    )
    (3): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (relu): ReLU(inplace=True)
      (c

In [7]:
import base64
import pandas as pd
from tqdm import tqdm

def generate_submission(model, test_dir, output_csv_file):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    
    # Get test files
    test_files = sorted(glob.glob(os.path.join(test_dir, "*.png")))
    encoded_images = []
    
    for file_path in tqdm(test_files):
        # Load and preprocess image
        img = cv2.imread(file_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Convert to tensor
        lr_tensor = torch.from_numpy(img.transpose(2, 0, 1).astype(np.float32)) / 127.5 - 1.0
        lr_tensor = lr_tensor.unsqueeze(0).to(device)
        
        # Inference
        with torch.no_grad():
            sr_tensor = model(lr_tensor)
        
        # Convert back to image
        sr_img = sr_tensor.squeeze(0).cpu().numpy().transpose(1, 2, 0)
        sr_img = np.clip((sr_img + 1) * 127.5, 0, 255).astype(np.uint8)
        sr_img = cv2.cvtColor(sr_img, cv2.COLOR_RGB2BGR)
        
        # Save temporarily and encode
        temp_path = f"temp_{os.path.basename(file_path)}"
        cv2.imwrite(temp_path, sr_img)
        
        # Encode to base64
        with open(temp_path, "rb") as img_file:
            encoded_img = base64.b64encode(img_file.read()).decode('utf-8')
        
        os.remove(temp_path)  # Clean up
        
        encoded_images.append({
            'id': os.path.basename(file_path),
            'Encoded_Image': encoded_img
        })
    
    # Create submission DataFrame
    df = pd.DataFrame(encoded_images)
    df.to_csv(output_csv_file, index=False)
    print(f"Submission saved to {output_csv_file}")

In [8]:
# First make sure your model is loaded with the best weights
model.load_state_dict(torch.load("best_sr_model_1.pth"))

# Define your test directory and output path
test_dir = "/kaggle/input/gnr638/lr/lr"  # Directory containing test LR images
output_csv_file = "/kaggle/working/submission_latest.csv"  # Where to save the submission file

# Call the submission function
generate_submission(model, test_dir, output_csv_file)

  model.load_state_dict(torch.load("best_sr_model_1.pth"))
100%|██████████| 500/500 [01:50<00:00,  4.51it/s]


Submission saved to /kaggle/working/submission_latest.csv
