In [None]:
from google.colab import drive
drive.mount('/content/drive')

import zipfile
import os
zip_path = "/content/drive/MyDrive/GOPRO_Large.zip"

extract_path = "/content/gopro_data"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("Veri başarıyla çıkarıldı:", os.listdir(extract_path))

In [1]:
folder_path = '/content/gopro_data'
train_path = os.path.join(folder_path, 'train')
test_path = os.path.join(folder_path, 'test')

import cv2
import torch
import numpy as np

X_train = []
Y_train = []
X_test = []
Y_test = []

image_size = (800 , 450)  # sabit boyut (örnek)

def load_image_tensor(path):
    img = cv2.imread(path)
    if img is None:
        return None
    img = cv2.resize(img, image_size)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = img.astype(np.float32) / 255.0
    img = np.transpose(img, (2, 0, 1))  # HWC → CHW
    return torch.tensor(img)

# Eğitim verisi: blur → sharp eşleşmesi
for class_folder in os.listdir(train_path):
    class_path = os.path.join(train_path, class_folder)
    blur_path = os.path.join(class_path, 'blur')
    sharp_path = os.path.join(class_path, 'sharp')

    if os.path.isdir(blur_path) and os.path.isdir(sharp_path):
        for image_file in os.listdir(blur_path):
            blur_img_path = os.path.join(blur_path, image_file)
            sharp_img_path = os.path.join(sharp_path, image_file)  # Aynı isimde olmalı

            blur_tensor = load_image_tensor(blur_img_path)
            sharp_tensor = load_image_tensor(sharp_img_path)

            if blur_tensor is not None and sharp_tensor is not None:
                X_train.append(blur_tensor)
                Y_train.append(sharp_tensor)

# Test verisi: aynı şekilde
for class_folder in os.listdir(test_path):
    class_path = os.path.join(test_path, class_folder)
    blur_path = os.path.join(class_path, 'blur')
    sharp_path = os.path.join(class_path, 'sharp')

    if os.path.isdir(blur_path) and os.path.isdir(sharp_path):
        for image_file in os.listdir(blur_path):
            blur_img_path = os.path.join(blur_path, image_file)
            sharp_img_path = os.path.join(sharp_path, image_file)

            blur_tensor = load_image_tensor(blur_img_path)
            sharp_tensor = load_image_tensor(sharp_img_path)

            if blur_tensor is not None and sharp_tensor is not None:
                X_test.append(blur_tensor)
                Y_test.append(sharp_tensor)

In [4]:
import torch
import torch.nn as nn

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()

        # l1: conv(3 -> 32), bn, relu
        self.l1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        # l2: conv(32 -> 64), bn, relu
        self.l2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        # t1: input + max_pool(input)
        self.maxpool_t1_t4 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)

        # Bottleneck: 64 -> 32 (t2 öncesi)
        self.bottleneck = nn.Conv2d(64, 32, kernel_size=1, stride=1, padding=0)

        # t2 components: four convs for 32 -> 16 each (bottleneck sonrası)
        self.t2_conv1 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1)
        self.t2_conv2 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1)
        self.t2_conv3 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1)
        self.t2_conv4 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1)

        # t3 reductions
        self.t3_reduce_l6 = nn.Conv2d(192, 64, kernel_size=3, stride=1, padding=1)
        self.t3_reduce_l8_l9 = nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1)
        self.t3_reduce_l11 = nn.Conv2d(96, 32, kernel_size=3, stride=1, padding=1)

        # Final conv
        self.final_conv = nn.Conv2d(32, 3, kernel_size=3, stride=1, padding=1)
        self.tanh = nn.Tanh()

    def t1(self, x):
        max_pooled = self.maxpool_t1_t4(x)
        return x + max_pooled

    def t2(self, x):
        x = self.bottleneck(x)  # 64 -> 32
        c1 = self.t2_conv1(x)
        c2 = self.t2_conv2(x)
        c3 = self.t2_conv3(x)
        c4 = self.t2_conv4(x)
        return torch.cat([c1, c2, c3, c4], dim=1)  # [64, 450, 800]

    def t4(self, x):
        max_pooled = self.maxpool_t1_t4(x)
        concat = torch.cat([x, max_pooled], dim=1)
        return nn.functional.relu(concat)

    def forward(self, x):
        l1_out = self.l1(x)        # [32, 450, 800]
        l2_out = self.l2(l1_out)   # [64, 450, 800]
        l3_out = self.t1(l2_out)   # [64, 450, 800]
        l4_out = self.t2(l3_out)   # [64, 450, 800]
        l5_out = self.t4(l4_out)   # [128, 450, 800]
        l6_concat = torch.cat([l5_out, l4_out], dim=1)  # [192, 450, 800]
        l6_out = self.t3_reduce_l6(l6_concat)  # [64, 450, 800]
        l7_out = self.t2(l6_out)   # [64, 450, 800]
        l8_concat = torch.cat([l7_out, l3_out], dim=1)  # [128, 450, 800]
        l8_out = self.t3_reduce_l8_l9(l8_concat)  # [64, 450, 800]
        l9_concat = torch.cat([l8_out, l2_out], dim=1)  # [128, 450, 800]
        l9_out = self.t3_reduce_l8_l9(l9_concat)  # [64, 450, 800]
        l10_out = self.t2(l9_out)  # [64, 450, 800]
        l11_concat = torch.cat([l10_out, l1_out], dim=1)  # [96, 450, 800]
        l11_out = self.t3_reduce_l11(l11_concat)  # [32, 450, 800]
        l12_out = self.t1(l11_out)  # [32, 450, 800]
        final = self.final_conv(l12_out)  # [3, 450, 800]
        output = self.tanh(final) + x  # Residual connection
        return output

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=0),
            nn.Sigmoid()  # PatchGAN: her patch için 0-1 tahmini
        )
    def forward(self, x):
        return self.model(x)


In [6]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, Subset
import numpy as np
from tqdm import tqdm
from torchmetrics.image import StructuralSimilarityIndexMeasure
from torchvision import transforms
import os

def calculate_psnr(img1, img2, max_val=1.0):
    """Calculate PSNR (Peak Signal-to-Noise Ratio) between two images."""
    mse = torch.mean((img1 - img2) ** 2)
    if mse == 0:
        return float('inf')
    return 20 * torch.log10(max_val / torch.sqrt(mse)).item()

def train_model_progressive(generator, discriminator, X_train, Y_train, batch_size=8, epochs=50, lr=5e-3, lambda_mse=1.0, lambda_ssim=0.3, lambda_adv=0.01, device=None, save_path="best_model.pth"):
    # Cihaz ayarı
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    generator = generator.to(device)
    discriminator = discriminator.to(device)

    # Optimizörler
    optimizer_g = torch.optim.Adam(generator.parameters(), lr=lr, weight_decay=1e-4)
    optimizer_d = torch.optim.Adam(discriminator.parameters(), lr=lr, weight_decay=1e-4)

    # Öğrenme oranı scheduler
    scheduler_g = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer_g, mode='min', factor=0.5, patience=5)
    scheduler_d = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer_d, mode='min', factor=0.5, patience=5)

    # Kayıplar
    mse_loss = nn.MSELoss()
    ssim = StructuralSimilarityIndexMeasure(data_range=1.0).to(device)
    adversarial_loss = nn.BCELoss()

    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
    ])

    # X_train ve Y_train için aynı dönüşümleri uygula
    X_train_aug = []
    Y_train_aug = []
    for x, y in zip(X_train, Y_train):
        # Aynı rastgele dönüşümü hem x hem y için uygula
        seed = torch.randint(0, 2**32, (1,)).item()  # Rastgele tohum
        torch.manual_seed(seed)  # x için tohumu ayarla
        x_aug = transform(x.clone())
        torch.manual_seed(seed)  # y için aynı tohumu kullan
        y_aug = transform(y.clone())
        X_train_aug.append(x_aug)
        Y_train_aug.append(y_aug)

    # Tüm veri setini kullan
    full_dataset = TensorDataset(torch.stack(X_train_aug), torch.stack(Y_train_aug))

    # Eğitim ve doğrulama için bölme (%80 eğitim, %20 doğrulama)
    total_size = len(full_dataset)
    train_size = int(0.8 * total_size)
    val_size = total_size - train_size
    indices = np.random.permutation(total_size)
    train_indices = indices[:train_size]
    val_indices = indices[train_size:]

    train_dataset = Subset(full_dataset, train_indices)
    val_dataset = Subset(full_dataset, val_indices)

    # DataLoader'lar
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Kayıp izleme
    train_g_losses, val_g_losses = [], []
    train_d_losses = []
    train_ssim, val_ssim = [], []
    train_psnr, val_psnr = [], []

    best_val_loss = float('inf')
    patience = 10
    patience_counter = 0
    for epoch in range(epochs):
        # 30 epoch’ta tam veri setine geçiş
        if (epoch + 1) % 30 == 0:
            optimizer_g = torch.optim.Adam(generator.parameters(), lr=lr/10, weight_decay=1e-4)
            optimizer_d = torch.optim.Adam(discriminator.parameters(), lr=lr/10, weight_decay=1e-4)

        # Eğitim aşaması
        generator.train()
        discriminator.train()
        train_g_loss = 0.0
        train_d_loss = 0.0
        train_ssim_sum = 0.0
        train_psnr_sum = 0.0
        train_count = 0

        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            inputs, targets = batch
            inputs, targets = inputs.to(device), targets.to(device)
            batch_size = inputs.size(0)

            # Diskriminatör Eğitimi
            optimizer_d.zero_grad()
            real_labels = torch.ones(batch_size, 1).to(device)
            fake_labels = torch.zeros(batch_size, 1).to(device)

            real_output = discriminator(targets).mean(dim=[2, 3])
            d_loss_real = adversarial_loss(real_output, real_labels)

            fake_images = generator(inputs)
            fake_output = discriminator(fake_images.detach()).mean(dim=[2, 3])
            d_loss_fake = adversarial_loss(fake_output, fake_labels)

            d_loss = (d_loss_real + d_loss_fake) / 2
            d_loss.backward()
            optimizer_d.step()

            # Jeneratör Eğitimi
            optimizer_g.zero_grad()
            fake_output = discriminator(fake_images).mean(dim=[2, 3])
            g_loss_adv = adversarial_loss(fake_output, real_labels)

            mse = mse_loss(fake_images, targets)
            ssim_val = ssim(fake_images, targets)
            g_loss = lambda_mse * mse + lambda_ssim * (1 - ssim_val) + lambda_adv * g_loss_adv

            g_loss.backward()
            optimizer_g.step()

            train_g_loss += g_loss.item() * batch_size
            train_d_loss += d_loss.item() * batch_size
            train_ssim_sum += ssim_val.item() * batch_size
            train_psnr_sum += calculate_psnr(fake_images, targets) * batch_size
            train_count += batch_size

        avg_train_g_loss = train_g_loss / train_count
        avg_train_d_loss = train_d_loss / train_count
        avg_train_ssim = train_ssim_sum / train_count
        avg_train_psnr = train_psnr_sum / train_count
        train_g_losses.append(avg_train_g_loss)
        train_d_losses.append(avg_train_d_loss)
        train_ssim.append(avg_train_ssim)
        train_psnr.append(avg_train_psnr)

        # Doğrulama aşaması
        generator.eval()
        discriminator.eval()
        val_g_loss = 0.0
        val_ssim_sum = 0.0
        val_psnr_sum = 0.0
        val_count = 0

        with torch.no_grad():
            for batch in val_loader:
                inputs, targets = batch
                inputs, targets = inputs.to(device), targets.to(device)
                batch_size = inputs.size(0)

                fake_images = generator(inputs)

                mse = mse_loss(fake_images, targets)
                ssim_val = ssim(fake_images, targets)
                g_loss = lambda_mse * mse + lambda_ssim * (1 - ssim_val)

                val_g_loss += g_loss.item() * batch_size
                val_ssim_sum += ssim_val.item() * batch_size
                val_psnr_sum += calculate_psnr(fake_images, targets) * batch_size
                val_count += batch_size

        avg_val_g_loss = val_g_loss / val_count
        avg_val_ssim = val_ssim_sum / val_count
        avg_val_psnr = val_psnr_sum / val_count
        val_g_losses.append(avg_val_g_loss)
        val_ssim.append(avg_val_ssim)
        val_psnr.append(avg_val_psnr)

        # Scheduler adımı
        scheduler_g.step(avg_val_g_loss)
        scheduler_d.step(avg_val_g_loss)

        # İlerleme yazdırma
        current_lr = optimizer_g.param_groups[0]['lr']
        print(f"Epoch {epoch+1}/{epochs}, LR: {current_lr:.6f}")
        print(f"Train G Loss: {avg_train_g_loss:.6f}, Train D Loss: {avg_train_d_loss:.6f}, Train SSIM: {avg_train_ssim:.4f}, Train PSNR: {avg_train_psnr:.2f} dB")
        print(f"Val G Loss: {avg_val_g_loss:.6f}, Val SSIM: {avg_val_ssim:.4f}, Val PSNR: {avg_val_psnr:.2f} dB")

        # Checkpointing
        if avg_val_g_loss < best_val_loss:
            best_val_loss = avg_val_g_loss
            torch.save({
                'generator_state_dict': generator.state_dict(),
                'discriminator_state_dict': discriminator.state_dict(),
            }, save_path)
            print(f"Best model saved at epoch {epoch+1} with Val G Loss: {best_val_loss:.6f}")
            patience_counter = 0
        else:
            patience_counter += 1

        # Erken durdurma
        if patience_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break


    return generator, discriminator, train_g_losses, val_g_losses, train_ssim, val_ssim, train_psnr, val_psnr

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# Modelleri başlatma
generator = NeuralNetwork().to(device)
discriminator = Discriminator().to(device)

# Eğitimi başlatma
trained_generator, trained_discriminator, train_g_losses, val_g_losses, train_ssim, val_ssim , train_psnr , val_psnr = train_model_progressive(
    generator=generator,
    discriminator=discriminator,
    X_train=X_train,
    Y_train=Y_train,
    batch_size=4,
    epochs=100,
    lr=5e-4,
    lambda_mse=1.0,
    lambda_ssim=0.3,
    lambda_adv=0.01,
    device=device,
    save_path="/content/best_model.pth"  # Colab için yol
)

# Eğitim sonuçlarını yazdırma
print("Eğitim tamamlandı.")
print(f"Son Train G Loss: {train_g_losses[-1]:.6f}, Son Val G Loss: {val_g_losses[-1]:.6f}")
print(f"Son Train SSIM: {train_ssim[-1]:.4f}, Son Val SSIM: {val_ssim[-1]:.4f}")


In [None]:
import matplotlib.pyplot as plt

def visualize_test_results(model_path, X_test, Y_test, num_samples=3, device=None):
    """
    Load the best GAN generator model and visualize predictions on test data.

    Args:
        model_path: Path to the saved model checkpoint (e.g., '/content/best_gan_model.pth')
        X_test: List of test input tensors [3, 450, 800]
        Y_test: List of test target tensors [3, 450, 800]
        num_samples: Number of samples to visualize
        device: Device to run model ('cuda' or 'cpu')
    """
    # Set device
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


    # Load generator model
    model = NeuralNetwork()
    try:
        checkpoint = torch.load(model_path, map_location=device)
        model.load_state_dict(checkpoint['generator_state_dict'])
    except KeyError as e:
        raise KeyError(f"Checkpoint at {model_path} does not contain 'generator_state_dict'. Ensure the model was saved correctly.")
    model = model.to(device)
    model.eval()

    # Initialize SSIM metric
    ssim = StructuralSimilarityIndexMeasure(data_range=1.0).to(device)

    # Randomly select samples
    indices = np.random.permutation(len(X_test))[:num_samples]
    X_samples = [X_test[i] for i in indices]
    Y_samples = [Y_test[i] for i in indices]

    # Plotting
    fig, axes = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))
    if num_samples == 1:
        axes = [axes]  # Ensure axes is iterable for single sample

    with torch.no_grad():
        for i in range(num_samples):
            # Prepare input
            input_img = X_samples[i].unsqueeze(0).to(device)  # [1, 3, 450, 800]
            target_img = Y_samples[i].to(device)  # [3, 450, 800]

            # Get prediction
            output_img = model(input_img).squeeze(0)  # [3, 450, 800]

            # Calculate metrics
            input_ssim = ssim(input_img, target_img.unsqueeze(0)).item()
            input_psnr = calculate_psnr(input_img, target_img.unsqueeze(0))
            pred_ssim = ssim(output_img.unsqueeze(0), target_img.unsqueeze(0)).item()
            pred_psnr = calculate_psnr(output_img.unsqueeze(0), target_img.unsqueeze(0))

            # Convert tensors to numpy for plotting
            input_np = input_img.squeeze().cpu().numpy().transpose(1, 2, 0)  # [450, 800, 3]
            target_np = target_img.cpu().numpy().transpose(1, 2, 0)
            output_np = output_img.cpu().numpy().transpose(1, 2, 0)

            # Clip to [0, 1] for visualization
            input_np = np.clip(input_np, 0, 1)
            target_np = np.clip(target_np, 0, 1)
            output_np = np.clip(output_np, 0, 1)

            # Plot
            axes[i][0].imshow(input_np)
            axes[i][0].set_title(f"Input (Blur)\nSSIM: {input_ssim:.4f}, PSNR: {input_psnr:.2f} dB")
            axes[i][0].axis('off')

            axes[i][1].imshow(target_np)
            axes[i][1].set_title("Target (Sharp)")
            axes[i][1].axis('off')

            axes[i][2].imshow(output_np)
            axes[i][2].set_title(f"Predicted\nSSIM: {pred_ssim:.4f}, PSNR: {pred_psnr:.2f} dB")
            axes[i][2].axis('off')

    plt.tight_layout()
    plt.show()

# Example usage with placeholder data
if __name__ == "__main__":

    # Visualize
    visualize_test_results(
        model_path="/content/best_model.pth",
        X_test=X_test,
        Y_test=Y_test,
        num_samples=50,
        device=None
    )