In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
from torchvision import transforms
import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Download dataset from Kaggle
path = kagglehub.dataset_download("darthvader4067/gopro")
print("Path to dataset files:", path)


Downloading from https://www.kaggle.com/api/v1/datasets/download/darthvader4067/gopro?dataset_version_number=1...


100%|██████████| 8.89G/8.89G [01:32<00:00, 103MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/darthvader4067/gopro/versions/1


In [None]:
# Data Preprocessing & Augmentation
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor()
])

In [None]:
# Plot sample images
def plot_sample_images(image_paths, title="Sample Images"):
    plt.figure(figsize=(10, 5))
    for i in range(min(5, len(image_paths))):
        img = cv2.imread(image_paths[i])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        plt.subplot(1, 5, i+1)
        plt.imshow(img)
        plt.axis('off')
    plt.suptitle(title)
    plt.show()

# Plot class distribution (here based on number of frames per video if applicable)
def plot_class_distribution(paths, title="Frame Count Distribution"):
    frame_counts = [len(os.listdir(p)) for p in paths]
    sns.histplot(frame_counts, bins=10, kde=True)
    plt.title(title)
    plt.xlabel("Number of Frames")
    plt.ylabel("Frequency")
    plt.show()

In [None]:
# Placeholder Model Definition
class EDVR(nn.Module):
    def __init__(self):
        super(EDVR, self).__init__()
        # Dummy layer for demonstration
        self.conv = nn.Conv2d(3, 3, kernel_size=3, padding=1)

    def forward(self, x):
        return self.conv(x)


In [None]:
# Dataset Classes
class GoProImageDataset(Dataset):
    def __init__(self, blurry_paths, sharp_paths, transform=None):
        self.blurry_paths = blurry_paths
        self.sharp_paths = sharp_paths
        self.transform = transform

    def __len__(self):
        return len(self.blurry_paths)

    def __getitem__(self, idx):
        blurry_img = cv2.imread(self.blurry_paths[idx])
        sharp_img = cv2.imread(self.sharp_paths[idx])
        blurry_img = cv2.cvtColor(blurry_img, cv2.COLOR_BGR2RGB)
        sharp_img = cv2.cvtColor(sharp_img, cv2.COLOR_BGR2RGB)
        if self.transform:
            blurry_img = self.transform(blurry_img)
            sharp_img = self.transform(sharp_img)
        return blurry_img, sharp_img

class GoProVideoDataset(Dataset):
    def __init__(self, blurry_paths, sharp_paths, num_frames=5, transform=None):
        self.blurry_paths = blurry_paths
        self.sharp_paths = sharp_paths
        self.num_frames = num_frames
        self.transform = transform

    def __len__(self):
        return len(self.blurry_paths) - self.num_frames

    def __getitem__(self, idx):
        blurry_seq = []
        sharp_seq = []
        for i in range(self.num_frames):
            blurry_img = cv2.imread(self.blurry_paths[idx + i])
            sharp_img = cv2.imread(self.sharp_paths[idx + i])
            blurry_img = cv2.cvtColor(blurry_img, cv2.COLOR_BGR2RGB)
            sharp_img = cv2.cvtColor(sharp_img, cv2.COLOR_BGR2RGB)
            if self.transform:
                blurry_img = self.transform(blurry_img)
                sharp_img = self.transform(sharp_img)
            blurry_seq.append(blurry_img)
            sharp_seq.append(sharp_img)
        return torch.stack(blurry_seq), torch.stack(sharp_seq)


In [None]:
# Training and Evaluation

def train_model(model, loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for blurry, sharp in loader:
            blurry, sharp = blurry.to(device), sharp.to(device)
            output = model(blurry)
            loss = criterion(output, sharp)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch [{epoch+1}/{num_epochs}] - Loss: {total_loss/len(loader):.4f}")


def evaluate_model(model, loader):
    model.eval()
    total_mse, total_psnr = 0, 0
    with torch.no_grad():
        for blurry, sharp in loader:
            blurry, sharp = blurry.to(device), sharp.to(device)
            output = model(blurry)
            mse = ((output - sharp) ** 2).mean().item()
            psnr = 10 * torch.log10(1 / mse)
            total_mse += mse
            total_psnr += psnr.item()
    print(f"Test MSE: {total_mse/len(loader):.4f}, PSNR: {total_psnr/len(loader):.2f} dB")


In [None]:
# Save & Download Model

def save_model(model, path="edvr_deblur.pth"):
    torch.save(model.state_dict(), path)
    print(f"Model saved to {path}")


In [None]:
# Image Model Training

# Placeholder image paths
blurry_paths = ["https://www.nyip.edu/media/zoo/images/stop-taking-blurry-pictures_89361f2a136cd1ae81bb54636ed2ee72.jpg", "https://media.istockphoto.com/id/2149838473/photo/business-meeting-blurred-background-and-people-in-office-for-teamwork-collaboration-and.webp?a=1&b=1&s=612x612&w=0&k=20&c=-unx25hyG4IBmpdLueCfpUzXFWbvpbB1DzA3YnPJcBk="]
sharp_paths = ["path_to_sharp_img1", "path_to_sharp_img2"]

# Image Dataset and Dataloader
image_dataset = GoProImageDataset(blurry_paths, sharp_paths, transform=transform)
image_loader = DataLoader(image_dataset, batch_size=4, shuffle=True)

# Initialize and train Image Model
image_model = EDVR().to(device)
image_optimizer = torch.optim.Adam(image_model.parameters(), lr=1e-4)
image_criterion = nn.MSELoss()

train_model(image_model, image_loader, image_criterion, image_optimizer, num_epochs=5)
evaluate_model(image_model, image_loader)
save_model(image_model, "edvr_deblur_image.pth")

error: OpenCV(4.11.0) /io/opencv/modules/imgproc/src/color.cpp:199: error: (-215:Assertion failed) !_src.empty() in function 'cvtColor'


In [None]:
# Video Model Training

# Placeholder video frame paths (same as above for demonstration)
video_dataset = GoProVideoDataset(blurry_paths, sharp_paths, num_frames=5, transform=transform)
video_loader = DataLoader(video_dataset, batch_size=2, shuffle=True)

# Initialize and train Video Model
video_model = EDVR().to(device)
video_optimizer = torch.optim.Adam(video_model.parameters(), lr=1e-4)
video_criterion = nn.MSELoss()

train_model(video_model, video_loader, video_criterion, video_optimizer, num_epochs=5)
evaluate_model(video_model, video_loader)
save_model(video_model, "edvr_deblur_video.pth")

# Download links for Colab
from google.colab import files
files.download("edvr_deblur_image.pth")
files.download("edvr_deblur_video.pth")


REDS - Restortation - ResidualNet Model

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.utils import make_grid
from google.colab import files
from skimage.metrics import structural_similarity as ssim

In [None]:
# Dataset preparation
class REDSRestorationDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.paired_files = self._load_paired_images()

    def _load_paired_images(self):
        input_dir = os.path.join(self.root_dir, "input")
        target_dir = os.path.join(self.root_dir, "target")
        inputs = sorted(os.listdir(input_dir))
        targets = sorted(os.listdir(target_dir))
        return [(os.path.join(input_dir, i), os.path.join(target_dir, t)) for i, t in zip(inputs, targets)]

    def __len__(self):
        return len(self.paired_files)

    def __getitem__(self, idx):
        input_path, target_path = self.paired_files[idx]
        input_img = cv2.imread(input_path)
        target_img = cv2.imread(target_path)
        input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
        target_img = cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)

        if self.transform:
            input_img = self.transform(input_img)
            target_img = self.transform(target_img)

        return input_img, target_img

In [None]:
# Transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

In [None]:
# Load dataset
path = "/root/.kagglehub/datasets/cookiemonsteryum/reds-video-superresolution-toy-dataset"
dataset = REDSRestorationDataset(path, transform=transform)

In [None]:
# Visualize sample images
def show_sample_images(dataset, n=5):
    fig, axs = plt.subplots(n, 2, figsize=(10, n*2))
    for i in range(n):
        inp, tgt = dataset[i]
        axs[i, 0].imshow(inp.permute(1, 2, 0))
        axs[i, 0].set_title("Input (Degraded)")
        axs[i, 1].imshow(tgt.permute(1, 2, 0))
        axs[i, 1].set_title("Target (Restored)")
        axs[i, 0].axis('off')
        axs[i, 1].axis('off')
    plt.tight_layout()
    plt.show()

show_sample_images(dataset)

In [None]:
# Split data
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
# Model Definition: ResidualNet
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    def forward(self, x):
        residual = x
        out = self.relu(self.conv1(x))
        out = self.conv2(out)
        out += residual
        return self.relu(out)

class ResidualUNet(nn.Module):
    def __init__(self):
        super(ResidualUNet, self).__init__()
        self.entry = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.resblock1 = ResidualBlock(64)
        self.resblock2 = ResidualBlock(64)
        self.exit = nn.Conv2d(64, 3, kernel_size=3, padding=1)

    def forward(self, x):
        out = self.entry(x)
        out = self.resblock1(out)
        out = self.resblock2(out)
        out = self.exit(out)
        return out


In [None]:
# Training setup
def train_model(model, loader, device, epochs=10):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    model.to(device)
    train_losses = []

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        epoch_loss = running_loss / len(loader)
        train_losses.append(epoch_loss)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}")

    return model, train_losses

def evaluate_model(model, loader, device):
    model.eval()
    criterion = nn.MSELoss()
    psnr_total = 0
    ssim_total = 0
    count = 0
    with torch.no_grad():
        for inputs, targets in loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            mse = criterion(outputs, targets).item()
            psnr = 10 * np.log10(1 / mse)
            psnr_total += psnr

            for i in range(inputs.size(0)):
                output_np = outputs[i].cpu().permute(1, 2, 0).numpy()
                target_np = targets[i].cpu().permute(1, 2, 0).numpy()
                ssim_val = ssim(output_np, target_np, channel_axis=2, data_range=1.0)
                ssim_total += ssim_val
                count += 1
    avg_psnr = psnr_total / len(loader)
    avg_ssim = ssim_total / count
    print(f"Average PSNR: {avg_psnr:.2f} dB")
    print(f"Average SSIM: {avg_ssim:.4f}")

In [None]:
# Training for Image Restoration
print("Training Image Restoration Model...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_model = ResidualUNet()
image_model, image_losses = train_model(image_model, train_loader, device)
plt.plot(image_losses)
plt.title("Image Restoration Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.show()
evaluate_model(image_model, test_loader, device)

In [None]:
# Save and download image model
torch.save(image_model.state_dict(), "residual_unet_image.pth")
files.download("residual_unet_image.pth")

In [None]:
# Simulate video restoration by grouping sequential frames
class VideoREDSRestorationDataset(REDSRestorationDataset):
    def __getitem__(self, idx):
        input_path, target_path = self.paired_files[idx]
        input_img = cv2.imread(input_path)
        target_img = cv2.imread(target_path)
        input_img = cv2.cvtColor(input_img, cv2.COLOR_BGR2RGB)
        target_img = cv2.cvtColor(target_img, cv2.COLOR_BGR2RGB)

        if self.transform:
            input_img = self.transform(input_img)
            target_img = self.transform(target_img)

        return input_img.unsqueeze(0), target_img.unsqueeze(0)

video_dataset = VideoREDSRestorationDataset(path, transform=transform)
train_vdataset, test_vdataset = torch.utils.data.random_split(video_dataset, [train_size, test_size])
train_vloader = DataLoader(train_vdataset, batch_size=8, shuffle=True)
test_vloader = DataLoader(test_vdataset, batch_size=8, shuffle=False)

In [None]:
# Training for Video Restoration
print("Training Video Restoration Model...")
video_model = ResidualUNet()
video_model, video_losses = train_model(video_model, train_vloader, device)
plt.plot(video_losses)
plt.title("Video Restoration Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.show()
evaluate_model(video_model, test_vloader, device)

In [None]:
# Save and download video model
torch.save(video_model.state_dict(), "residual_unet_video.pth")
files.download("residual_unet_video.pth")
