In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras import layers, Model
from sklearn.model_selection import train_test_split
import numpy as np
from keras import Model
from keras.layers import Conv2D, PReLU,BatchNormalization, Flatten
from keras.layers import UpSampling2D, LeakyReLU, Dense, Input, add
from tqdm import tqdm
from keras.applications import VGG19
import random
from keras.models import load_model
from numpy.random import randint
from keras.layers import Lambda, concatenate
import tensorflow as tf
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
import pandas as pd

In [None]:
# ----- GPU Setup (for T4x2 or similar) -----
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Set memory growth to prevent TensorFlow from consuming all GPU memory
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU is available and memory growth is set.")
    except RuntimeError as e:
        print(e)
else:
    print("No GPU found, running on CPU.")

In [None]:
#RRDB (Residual-in-Residual Dense Block):
def rrdb_block(input_tensor, filters=64, gc=32):
    # Residual-in-Residual Dense Block
    x1 = Conv2D(gc, (3,3), padding='same')(input_tensor)
    x1 = LeakyReLU(alpha=0.2)(x1)
    x1 = layers.concatenate([input_tensor, x1])
    
    x2 = Conv2D(gc, (3,3), padding='same')(x1)
    x2 = LeakyReLU(alpha=0.2)(x2)
    x2 = layers.concatenate([x1, x2])
    
    x3 = Conv2D(gc, (3,3), padding='same')(x2)
    x3 = LeakyReLU(alpha=0.2)(x3)
    x3 = layers.concatenate([x2, x3])
    
    x4 = Conv2D(filters, (3,3), padding='same')(x3)
    x4 = Lambda(lambda x: x * 0.2)(x4)  # Residual scaling
    out = add([input_tensor, x4])
    return out

def upscale_block(ip):
    x = Conv2D(256, (3,3), padding='same')(ip)
    x = UpSampling2D(size=2)(x)
    x = LeakyReLU(alpha=0.2)(x)
    return x

In [None]:
# Generator with RRDB blocks
def create_gen(gen_ip, num_res_block=16):
    x = Conv2D(64, (3,3), padding='same')(gen_ip)
    x = LeakyReLU(alpha=0.2)(x)
    temp = x
    
    for _ in range(num_res_block):
        x = rrdb_block(x)
    
    x = Conv2D(64, (3,3), padding='same')(x)
    x = add([x, temp])
    
    x = upscale_block(x)
    x = upscale_block(x)
    
    x = Conv2D(3, (3,3), padding='same', activation='sigmoid')(x) 
    return Model(gen_ip, x)

In [None]:
# Relativistic Discriminator
def create_disc(disc_ip):
    df = 64
    x = Conv2D(df, (3,3), padding='same')(disc_ip)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df, (3,3), strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*2, (3,3), padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*2, (3,3), strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*4, (3,3), padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*4, (3,3), strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*8, (3,3), padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Conv2D(df*8, (3,3), strides=2, padding='same')(x)
    x = LeakyReLU(0.2)(x)
    
    x = Flatten()(x)
    x = Dense(df*16)(x)
    x = LeakyReLU(0.2)(x)
    x = Dense(1)(x)  # No sigmoid for relativistic loss
    
    return Model(disc_ip, x)

In [None]:
# VGG19 for feature extraction
from keras.applications import VGG19
def build_vgg(hr_shape):
    vgg = VGG19(weights="imagenet", include_top=False, input_shape=hr_shape)
    return Model(inputs=vgg.inputs, outputs=vgg.layers[10].output)

In [None]:
#Combined model
def create_comb(gen_model, disc_model, vgg, lr_ip, hr_ip):
    gen_img = gen_model(lr_ip)
    
    gen_features = vgg(gen_img)
    
    disc_model.trainable = False
    validity = disc_model(gen_img)
    
    return Model(inputs=[lr_ip, hr_ip], outputs=[validity, gen_features])

In [None]:
# Data loading with your specified paths
n = 5000  # Number of images to use
lr_path = "/kaggle/input/sr-data/DATASET/AUG/LR_Aug"
hr_path = "/kaggle/input/sr-data/DATASET/AUG/HR_Aug"

# Load LR images
lr_list = os.listdir(lr_path)[:n]
lr_images = []
for img in lr_list:
    img_path = os.path.join(lr_path, img)
    img_lr = cv2.imread(img_path)
    img_lr = cv2.cvtColor(img_lr, cv2.COLOR_BGR2RGB)
    lr_images.append(img_lr)

# Load HR images
hr_list = os.listdir(hr_path)[:n]
hr_images = []
for img in hr_list:
    img_path = os.path.join(hr_path, img)
    img_hr = cv2.imread(img_path)
    img_hr = cv2.cvtColor(img_hr, cv2.COLOR_BGR2RGB)
    hr_images.append(img_hr)

lr_images = np.array(lr_images) / 255.0
hr_images = np.array(hr_images) / 255.0


In [None]:
#plotting random images for example
image_number = random.randint(0, len(lr_images)-1)
plt.figure(figsize=(12, 8))
plt.subplot(221)
plt.imshow(np.reshape(lr_images[image_number], (64, 64, 3)))
plt.subplot(222)
plt.imshow(np.reshape(hr_images[image_number], (256, 256, 3)))
plt.show()

In [None]:
# Convert to numpy arrays and normalize
lr_images = np.array(lr_images) / 255.0
hr_images = np.array(hr_images) / 255.0

# Train-test split
lr_train, lr_test, hr_train, hr_test = train_test_split(
    lr_images, hr_images, test_size=0.33, random_state=42
)

In [None]:
hr_shape = (256, 256, 3)
lr_shape = (64, 64, 3)

# Initialize models
generator = create_gen(Input(lr_shape))
generator.summary()

In [None]:
discriminator = create_disc(Input(hr_shape))
discriminator.compile(loss="binary_crossentropy", optimizer="adam", metrics=['accuracy'])
discriminator.summary()

In [None]:
vgg = build_vgg(hr_shape)
print(vgg.summary())
vgg.trainable = False

In [None]:
# Optimizers
gen_optimizer = tf.keras.optimizers.Adam(1e-4)
disc_optimizer = tf.keras.optimizers.Adam(1e-4)

In [None]:
# Training parameters
batch_size = 1
epochs = 25
# Prepare dataset
train_dataset = tf.data.Dataset.from_tensor_slices((lr_train, hr_train)).batch(batch_size)

In [None]:
# Initialize loss tracking (before training loop)
gen_losses = []
disc_losses = []

In [None]:
# ----- Training Loop -----
for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}/{epochs}")
    epoch_gen_loss = []
    epoch_disc_loss = []
    
    for step, (lr_batch, hr_batch) in enumerate(tqdm(train_dataset)):
        # --- Train Discriminator ---
        with tf.GradientTape() as disc_tape:
            fake_hr = generator(lr_batch, training=True)
            real_output = discriminator(hr_batch, training=True)
            fake_output = discriminator(fake_hr, training=True)
            
            # Relativistic discriminator loss
            real_loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    labels=tf.ones_like(real_output),
                    logits=real_output - tf.reduce_mean(fake_output)
                )
            )
            fake_loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    labels=tf.zeros_like(fake_output),
                    logits=fake_output - tf.reduce_mean(real_output)
                )
            )
            disc_loss = (real_loss + fake_loss) * 0.5
        
        disc_grads = disc_tape.gradient(disc_loss, discriminator.trainable_weights)
        disc_optimizer.apply_gradients(zip(disc_grads, discriminator.trainable_weights))
        epoch_disc_loss.append(disc_loss.numpy())
        
        # --- Train Generator ---
        with tf.GradientTape() as gen_tape:
            fake_hr = generator(lr_batch, training=True)
            
            fake_output = discriminator(fake_hr, training=False)
            real_output = discriminator(hr_batch, training=False)
            gen_adv_loss = tf.reduce_mean(
                tf.nn.sigmoid_cross_entropy_with_logits(
                    labels=tf.ones_like(fake_output),
                    logits=fake_output - tf.reduce_mean(real_output)
                )
            )
            
            # Feature (content) loss using VGG19
            gen_features = vgg(fake_hr)
            real_features = vgg(hr_batch)
            gen_content_loss = tf.keras.losses.MeanSquaredError()(real_features, gen_features)
            
            # Total generator loss (adjust weight of adversarial loss as needed)
            gen_total_loss = gen_content_loss + 1e-3 * gen_adv_loss
        
        gen_grads = gen_tape.gradient(gen_total_loss, generator.trainable_weights)
        gen_optimizer.apply_gradients(zip(gen_grads, generator.trainable_weights))
        epoch_gen_loss.append(gen_total_loss.numpy())
        
    avg_gen_loss = np.mean(epoch_gen_loss)
    avg_disc_loss = np.mean(epoch_disc_loss)
    gen_losses.append(avg_gen_loss)
    disc_losses.append(avg_disc_loss)
    print(f"Epoch {epoch+1}: Generator Loss = {avg_gen_loss:.4f}, Discriminator Loss = {avg_disc_loss:.4f}")
    
    # Save model periodically (here every epoch, adjust as needed)
    generator.save(f"esrgan_gen_epoch{epoch+1}.h5")

generator.save("esrgan_generator_final.h5")

In [None]:
generator.save("esrgan_generator_final.h5")

In [None]:
#----Visualisations and Grpahs----#

In [None]:
# Evaluation Metrics
def evaluate_gan(generator, lr_test, hr_test, batch_size=8):
    psnr_values = []
    ssim_values = []
    mse_values = []
    lr_samples = []
    hr_samples = []
    sr_samples = []
    
    # Process in batches
    for i in tqdm(range(0, len(lr_test), batch_size), desc="Generating SR images"):
        lr_batch = lr_test[i:i+batch_size]
        hr_batch = hr_test[i:i+batch_size]
        
        sr_batch = generator.predict(lr_batch, verbose=0)
        sr_batch = np.clip(sr_batch, 0.0, 1.0)  # Clip outputs to [0, 1]
        
        for j in range(len(hr_batch)):
            psnr_val = psnr(hr_batch[j], sr_batch[j], data_range=1.0)
            ssim_val = ssim(hr_batch[j], sr_batch[j], data_range=1.0, channel_axis=-1)
            mse_val = np.mean((hr_batch[j] - sr_batch[j]) ** 2)
            
            psnr_values.append(psnr_val)
            ssim_values.append(ssim_val)
            mse_values.append(mse_val)
            
        lr_samples.extend(lr_batch)
        hr_samples.extend(hr_batch)
        sr_samples.extend(sr_batch)
    
    return {
        'psnr': psnr_values,
        'ssim': ssim_values,
        'mse': mse_values,
        'lr_samples': np.array(lr_samples),
        'hr_samples': np.array(hr_samples),
        'sr_samples': np.array(sr_samples)
    }
gan_metrics = evaluate_gan(generator, lr_test, hr_test)

In [None]:
# Loss Visualization 
plt.figure(figsize=(10, 6))
plt.plot(gen_losses, label='Training Loss')
plt.plot(disc_losses, label='Validation Loss')
plt.title('ESRGAN Training Progress')
plt.xlabel('Epochs')
plt.ylabel('MSE Loss')
plt.legend()
plt.show()

In [None]:
# Metric Distributions
def plot_gan_metrics(gan_metrics):
    """Visualize distribution of GAN evaluation metrics"""
    fig, axs = plt.subplots(1, 3, figsize=(18,5))
    
    metrics = ['psnr', 'ssim', 'mse']
    titles = ['PSNR Distribution', 'SSIM Distribution', 'MSE Distribution']
    colors = ['purple', 'green', 'red']
    
    for ax, metric, title, color in zip(axs, metrics, titles, colors):
        ax.hist(gan_metrics[metric], bins=30, alpha=0.7, color=color)
        ax.set_title(title)
        ax.set_xlabel(metric.upper())
        ax.set_ylabel('Frequency')
        
        # Add vertical lines for mean values
        mean_val = np.mean(gan_metrics[metric])
        ax.axvline(mean_val, color='black', linestyle='dashed', linewidth=2,
                  label=f'Mean: {mean_val:.2f}')
        ax.legend()
    
    plt.tight_layout()
    plt.show()

plot_gan_metrics(gan_metrics)

In [None]:
# Sample Comparisions
def plot_gan_samples(gan_metrics, num_samples=10):
    """Visual comparison with error heatmaps"""
    indices = np.random.choice(len(gan_metrics['lr_samples']), num_samples)
    
    for idx in indices:
        plt.figure(figsize=(18,4))
        
        # LR Input
        plt.subplot(141)
        plt.imshow(gan_metrics['lr_samples'][idx])
        plt.title('Medium-Resolution Input')
        plt.axis('off')
        
        # HR Ground Truth
        plt.subplot(142)
        plt.imshow(gan_metrics['hr_samples'][idx])
        plt.title('High-Resolution Target')
        plt.axis('off')
        
        # SR Output
        plt.subplot(143)
        plt.imshow(gan_metrics['sr_samples'][idx])
        metrics_text = f"PSNR: {gan_metrics['psnr'][idx]:.2f}\n" \
                       f"SSIM: {gan_metrics['ssim'][idx]:.4f}\n" \
                       f"MSE: {gan_metrics['mse'][idx]:.5f}"
        plt.title('ESRGAN Output\n' + metrics_text)
        plt.axis('off')
        
        # Error Map
        plt.subplot(144)
        error = np.abs(gan_metrics['hr_samples'][idx] - gan_metrics['sr_samples'][idx])
        plt.imshow(np.mean(error, axis=-1), cmap='inferno', vmin=0, vmax=0.07)  # Set dynamic range
        plt.title('Pixel Error Heatmap')
        plt.colorbar()
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()

plot_gan_samples(gan_metrics)

In [None]:
# Error Analysis
def plot_gan_error_analysis(gan_metrics):
    """Detailed error characterization"""
    fig, axs = plt.subplots(1, 2, figsize=(12,5))
    
    # Pixel Error Distribution
    errors = np.concatenate([np.abs(hr - sr) for hr, sr in 
                           zip(gan_metrics['hr_samples'], gan_metrics['sr_samples'])])
    axs[0].hist(errors.flatten(), bins=50, color='darkorange', density=True)
    axs[0].set_title('Pixel Error Distribution')
    axs[0].set_xlabel('Absolute Error')
    axs[0].set_ylabel('Density')
    
    # Metric Correlation
    axs[1].scatter(gan_metrics['psnr'], gan_metrics['ssim'], alpha=0.5)
    axs[1].set_title('PSNR vs SSIM Correlation')
    axs[1].set_xlabel('PSNR')
    axs[1].set_ylabel('SSIM')
    
    plt.tight_layout()
    plt.show()

plot_gan_error_analysis(gan_metrics)

In [None]:
#----Calculate the average metrics------#
#PSNR: Higher is better
#SSIM: Closer to 1 is better
#MSE: Lower is better

def evaluate_esrgan(generator, lr_test, hr_test):
    psnr_values = []
    ssim_values = []
    mse_values = []
    
    for i in tqdm(range(len(lr_test))):
        lr = lr_test[i:i+1]
        hr = hr_test[i:i+1]
        
        gen_img = generator.predict(lr)
        gen_img = np.clip(gen_img, 0, 1)
        
        # Calculate metrics
        psnr_val = psnr(hr[0], gen_img[0], data_range=1.0)
        ssim_val = ssim(hr[0], gen_img[0], 
                       data_range=1.0, multichannel=True, channel_axis=-1)
        mse = np.mean((hr[0] - gen_img[0]) ** 2)
        
        psnr_values.append(psnr_val)
        ssim_values.append(ssim_val)
        mse_values.append(mse)
    
    metrics_df = pd.DataFrame({
        'PSNR': psnr_values,
        'SSIM': ssim_values,
        'MSE': mse_values
    })
    
    print("\nAverage Metrics:")
    print(metrics_df.mean())
    
    return metrics_df

esrgan_metrics = evaluate_esrgan(generator, lr_test, hr_test)

In [None]:
import os
import numpy as np
import cv2
import tensorflow as tf
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd

# ======================== Load Dataset ========================
def load_new_srgan_esrgan_dataset(new_lr_path, new_hr_path, batch_size=4, n=None):
    """Load new dataset for SRGAN/ESRGAN testing and create TensorFlow dataset"""
    new_lr_list = os.listdir(new_lr_path)
    new_hr_list = os.listdir(new_hr_path)
    
    if n is not None:
        new_lr_list = new_lr_list[:n]
        new_hr_list = new_hr_list[:n]
    
    # Load new LR images
    new_lr_images = []
    for img in tqdm(new_lr_list, desc="Loading New LR Images"):
        img_path = os.path.join(new_lr_path, img)
        img_lr = cv2.imread(img_path)
        img_lr = cv2.cvtColor(img_lr, cv2.COLOR_BGR2RGB)
        new_lr_images.append(img_lr)
    
    # Load new HR images
    new_hr_images = []
    for img in tqdm(new_hr_list, desc="Loading New HR Images"):
        img_path = os.path.join(new_hr_path, img)
        img_hr = cv2.imread(img_path)
        img_hr = cv2.cvtColor(img_hr, cv2.COLOR_BGR2RGB)
        new_hr_images.append(img_hr)
    
    # Convert to numpy arrays and normalize
    new_lr_images = np.array(new_lr_images) / 255.0
    new_hr_images = np.array(new_hr_images) / 255.0
    
    # Create TensorFlow dataset
    new_dataset = tf.data.Dataset.from_tensor_slices((new_lr_images, new_hr_images))
    new_dataset = new_dataset.batch(batch_size).prefetch(1)
    
    return new_dataset, new_lr_images, new_hr_images

# ======================== Model Evaluation =======================
def evaluate_srgan_or_esrgan(generator, discriminator, dataset):
    """Evaluate SRGAN/ESRGAN on the given dataset"""
    psnr_values = []
    ssim_values = []
    mse_values = []
    gan_loss_values = []  # For GAN loss evaluation

    for lr_image, hr_image in dataset:
        # Generate high-resolution image using the generator
        sr_image = generator.predict(lr_image)
        
        # Calculate traditional metrics
        psnr = calculate_psnr(hr_image, sr_image)
        ssim = calculate_ssim(hr_image, sr_image)
        mse = calculate_mse(hr_image, sr_image)

        # Calculate GAN loss using discriminator
        gan_loss = discriminator.evaluate(lr_image, sr_image)
        
        psnr_values.append(psnr)
        ssim_values.append(ssim)
        mse_values.append(mse)
        gan_loss_values.append(gan_loss)
    
    metrics = {
        'psnr_values': psnr_values,
        'ssim_values': ssim_values,
        'mse_values': mse_values,
        'gan_loss_values': gan_loss_values
    }
    return metrics

# ======================== Metric Calculation =======================
def calculate_psnr(hr_image, sr_image):
    """Calculate PSNR between high-resolution and generated images"""
    return tf.image.psnr(hr_image, sr_image, max_val=1.0)

def calculate_ssim(hr_image, sr_image):
    """Calculate SSIM between high-resolution and generated images"""
    return tf.image.ssim(hr_image, sr_image, max_val=1.0)

def calculate_mse(hr_image, sr_image):
    """Calculate MSE between high-resolution and generated images"""
    return tf.reduce_mean(tf.square(hr_image - sr_image))

# ======================== Comparing Datasets ========================
def compare_esrgan_datasets(original_metrics, new_metrics):
    """Compare metrics between original test set and new dataset for SRGAN/ESRGAN"""
    comparison = {
        'PSNR': {
            'Original': np.mean(original_metrics['psnr_values']),
            'New Dataset': np.mean(new_metrics['psnr_values'])
        },
        'SSIM': {
            'Original': np.mean(original_metrics['ssim_values']),
            'New Dataset': np.mean(new_metrics['ssim_values'])
        },
        'MSE': {
            'Original': np.mean(original_metrics['mse_values']),
            'New Dataset': np.mean(new_metrics['mse_values'])
        },
        'GAN Loss': {
            'Original': np.mean(original_metrics['gan_loss_values']),
            'New Dataset': np.mean(new_metrics['gan_loss_values'])
        }
    }
    
    df = pd.DataFrame(comparison).T
    print("\nESRGAN Performance Comparison:")
    print(df)
    
    # Visualization
    plt.figure(figsize=(12, 6))
    metrics = ['PSNR', 'SSIM', 'MSE', 'GAN Loss']
    
    for i, metric in enumerate(metrics):
        plt.subplot(1, 4, i+1)
        plt.bar(['Original', 'New Dataset'], 
                [comparison[metric]['Original'], comparison[metric]['New Dataset']],
                color=['blue', 'orange'])
        plt.title(f'{metric} Comparison')
        plt.ylabel(metric)
    
    plt.tight_layout()
    plt.show()

# ======================== Visual Comparison =======================
def plot_esrgan_side_by_side(original_metrics, new_metrics, num_samples=3):
    """Visual comparison between original test and new dataset results"""
    orig_indices = np.random.choice(len(original_metrics['psnr_values']), num_samples)
    new_indices = np.random.choice(len(new_metrics['psnr_values']), num_samples)
    
    for orig_idx, new_idx in zip(orig_indices, new_indices):
        plt.figure(figsize=(18, 8))
        
        # Original Test Set Results
        plt.subplot(231)
        plt.imshow(original_metrics['lr_samples'][orig_idx])
        plt.title('Medium-Resolution Input')
        plt.axis('off')
        
        plt.subplot(232)
        plt.imshow(original_metrics['hr_samples'][orig_idx])
        plt.title('High-resolution Target')
        plt.axis('off')
        
        plt.subplot(233)
        plt.imshow(original_metrics['sr_samples'][orig_idx])
        plt.title(f'ESRGAN Output \nPSNR: {original_metrics["psnr_values"][orig_idx]:.2f}')
        plt.axis('off')
        
        # New Dataset Results
        plt.subplot(234)
        plt.imshow(new_metrics['lr_samples'][new_idx])
        plt.title('Test MR')
        plt.axis('off')
        
        plt.subplot(235)
        plt.imshow(new_metrics['hr_samples'][new_idx])
        plt.title('Test HR')
        plt.axis('off')
        
        plt.subplot(236)
        plt.imshow(new_metrics['sr_samples'][new_idx])
        plt.title("ESRGAN Output")
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()

# ======================== USAGE EXAMPLE ========================

# Load new dataset (replace paths with your actual paths)
new_lr_path = "/kaggle/input/test-sr/testing dataset/LR"
new_hr_path = "/kaggle/input/test-sr/testing dataset/HR"
new_test_dataset, new_lr_array, new_hr_array = load_new_esrgan_dataset(new_lr_path, new_hr_path)

# Assuming you have a pre-trained SRGAN or ESRGAN generator and discriminator
generator = load_srgan_or_esrgan_generator()  # Replace with actual model loading code
discriminator = load_srgan_or_esrgan_discriminator()  # Replace with actual model loading code

# Evaluate on new dataset
new_esrgan_metrics = evaluate_esrgan(generator, discriminator, new_test_dataset)

# Compare with original test results
compare_esrgan_datasets(original_esrgan_metrics, new_esrgan_metrics)

# Visual comparison between test sets
plot_esrgan_side_by_side(original_esrgan_metrics, new_esrgan_metrics)

# Save comparison results to CSV
comparison_df = pd.DataFrame({
    'Dataset': ['Original', 'New'] * len(original_srgan_metrics['psnr_values']),
    'PSNR': original_srgan_metrics['psnr_values'] + new_srgan_metrics['psnr_values'],
    'SSIM': original_srgan_metrics['ssim_values'] + new_srgan_metrics['ssim_values'],
    'MSE': original_srgan_metrics['mse_values'] + new_srgan_metrics['mse_values'],
    'GAN Loss': original_srgan_metrics['gan_loss_values'] + new_srgan_metrics['gan_loss_values']
})
comparison_df.to_csv('esrgan_dataset_comparison.csv', index=False)



In [None]:
#----Test Code for SR predictions----#
import matplotlib.pyplot as plt
import numpy as np
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim
from keras.models import load_model
from numpy.random import randint

# Load generator model
generator = load_model('esrgan_generator_final.h5', compile=False)

# Get test data
[X1, X2] = [lr_test, hr_test]

# Select random example
ix = randint(0, len(X1), 1)
src_image, tar_image = X1[ix], X2[ix]

# Generate super-resolved image
gen_image = generator.predict(src_image)


# Clip values to valid image range
src_image = np.clip(src_image, 0, 1)
gen_image = np.clip(gen_image, 0, 1)
tar_image = np.clip(tar_image, 0, 1)

# Calculate metrics
psnr_val = psnr(tar_image[0], gen_image[0], data_range=1.0)
ssim_val = ssim(tar_image[0], gen_image[0], 
                data_range=1.0, multichannel=True, channel_axis=-1)
mse = np.mean((tar_image[0] - gen_image[0]) ** 2)

# Create figure
plt.figure(figsize=(12, 8))

# Plot LR image
plt.subplot(231)
plt.imshow(src_image[0])
plt.title('Low-Resolution Input')
plt.axis('off')

# Plot Super-Resolved image
plt.subplot(232)
plt.imshow(gen_image[0])
plt.title(f'ESRGAN Output\nPSNR: {psnr_val:.2f} dB, SSIM: {ssim_val:.4f}\nMSE: {mse:.4f}')
plt.axis('off')

# Plot HR Ground Truth
plt.subplot(233)
plt.imshow(tar_image[0])
plt.title('High-Resolution Ground Truth')
plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
#----Testing with New Dataset---#

In [None]:
# ====================== IMPORTS & SETUP ======================
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

# ====================== CONFIGURATION ======================
# Update these paths with your actual dataset locations
DATASET_PATHS = {
    'new_dataset': {
        'lr': "/kaggle/input/test-sr/testing dataset/LR",
        'hr': "/kaggle/input/test-sr/testing dataset/HR"
    }
}

# ====================== CORE FUNCTIONS ======================
def load_dataset(lr_path, hr_path, n=None):
    """Load dataset from specified paths"""
    lr_images, hr_images = [], []
    
    # Load LR images
    for img in tqdm(sorted(os.listdir(lr_path))[:n], desc=f"Loading LR from {lr_path}"):
        img_array = cv2.imread(os.path.join(lr_path, img))
        lr_images.append(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
    
    # Load HR images
    for img in tqdm(sorted(os.listdir(hr_path))[:n], desc=f"Loading HR from {hr_path}"):
        img_array = cv2.imread(os.path.join(hr_path, img))
        hr_images.append(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
    
    return np.array(lr_images)/255.0, np.array(hr_images)/255.0

def evaluate_model(generator, lr_images, hr_images, batch_size=8):
    """Evaluate model performance on given dataset"""
    metrics = {
        'psnr': [], 'ssim': [], 'mse': [],
        'lr_samples': [], 'hr_samples': [], 'sr_samples': []
    }
    
    for i in tqdm(range(0, len(lr_images), batch_size), desc="Generating SR images"):
        lr_batch = lr_images[i:i+batch_size]
        hr_batch = hr_images[i:i+batch_size]
        sr_batch = generator.predict(lr_batch, verbose=0)
        
        for j in range(len(hr_batch)):
            metrics['psnr'].append(psnr(hr_batch[j], sr_batch[j], data_range=1.0))
            metrics['ssim'].append(ssim(hr_batch[j], sr_batch[j], 
                                      data_range=1.0, channel_axis=-1))
            metrics['mse'].append(np.mean((hr_batch[j] - sr_batch[j]) ** 2))
        
        metrics['lr_samples'].extend(lr_batch)
        metrics['hr_samples'].extend(hr_batch)
        metrics['sr_samples'].extend(sr_batch)
    
    return metrics

# ====================== ANALYSIS & VISUALIZATION ======================
def plot_esrgan_samples(esrgan_metrics, num_samples=10):
    """Visual comparison with error heatmaps (ESRGAN version)"""
    indices = np.random.choice(len(esrgan_metrics['hr_samples']), num_samples)
    
    for idx in indices:
        plt.figure(figsize=(18, 4))
        
        # LR Input
        plt.subplot(141)
        plt.imshow(esrgan_metrics['lr_samples'][idx])
        plt.title('Medium-Resolution Input')
        plt.axis('off')
        
        # HR Ground Truth
        plt.subplot(142)
        plt.imshow(esrgan_metrics['hr_samples'][idx])
        plt.title('High-Resolution Target')
        plt.axis('off')
        
        # SR Output
        plt.subplot(143)
        plt.imshow(esrgan_metrics['sr_samples'][idx])
        metrics_text = f"PSNR: {esrgan_metrics['psnr'][idx]:.2f}\n" \
                       f"SSIM: {esrgan_metrics['ssim'][idx]:.4f}\n" \
                       f"MSE: {esrgan_metrics['mse'][idx]:.5f}"
        plt.title('ESRGAN Output\n' + metrics_text)
        plt.axis('off')
        
        # Error Map
        plt.subplot(144)
        error = np.abs(esrgan_metrics['hr_samples'][idx] - esrgan_metrics['sr_samples'][idx])
        plt.imshow(np.mean(error, axis=-1), cmap='inferno')
        plt.title('Pixel Error Heatmap')
        plt.colorbar()
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()

# ====================== EXECUTION FLOW ======================
if __name__ == "__main__":
    # 1. Load new evaluation dataset (corrected)
    print("Loading new evaluation dataset...")
    new_lr, new_hr = load_dataset(DATASET_PATHS['new_dataset']['lr'],
                                   DATASET_PATHS['new_dataset']['hr'])
    
    # 2. Evaluate ESRGAN model on dataset
    print("\nEvaluating on new dataset...")
    esrgan_metrics = evaluate_model(generator, new_lr, new_hr)
    
    # 3. Visual comparison with ESRGAN
    print("\nGenerating ESRGAN visual comparisons...")
    plot_esrgan_samples(esrgan_metrics)
    
    # 4. Save results
    esrgan_metrics_df = pd.DataFrame({
        'PSNR': esrgan_metrics['psnr'],
        'SSIM': esrgan_metrics['ssim'],
        'MSE': esrgan_metrics['mse']
    })
    esrgan_metrics_df.to_csv('esrgan_comparison.csv', index=False)
    print("\nComparison saved to esrgan_comparison.csv")


In [None]:
# ====================== IMPORTS & SETUP ======================
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage.metrics import structural_similarity as ssim

# ====================== CONFIGURATION ======================
# Update these paths with your actual dataset locations
DATASET_PATHS = {
    'new_dataset': {
        'lr': "/kaggle/input/sr-data/DATASET/AUG/LR_Aug",
        'hr': "/kaggle/input/sr-data/DATASET/AUG/HR_Aug"
    }
}

# ====================== CORE FUNCTIONS ======================
def load_dataset(lr_path, hr_path, n=None):
    """Load dataset from specified paths"""
    lr_images, hr_images = [], []
    
    # Load LR images
    for img in tqdm(sorted(os.listdir(lr_path))[:n], desc=f"Loading LR from {lr_path}"):
        img_array = cv2.imread(os.path.join(lr_path, img))
        lr_images.append(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
    
    # Load HR images
    for img in tqdm(sorted(os.listdir(hr_path))[:n], desc=f"Loading HR from {hr_path}"):
        img_array = cv2.imread(os.path.join(hr_path, img))
        hr_images.append(cv2.cvtColor(img_array, cv2.COLOR_BGR2RGB))
    
    return np.array(lr_images)/255.0, np.array(hr_images)/255.0

def evaluate_model(generator, lr_images, hr_images, batch_size=8):
    """Evaluate model performance on given dataset"""
    metrics = {
        'psnr': [], 'ssim': [], 'mse': [],
        'lr_samples': [], 'hr_samples': [], 'sr_samples': []
    }
    
    for i in tqdm(range(0, len(lr_images), batch_size), desc="Generating SR images"):
        lr_batch = lr_images[i:i+batch_size]
        hr_batch = hr_images[i:i+batch_size]
        sr_batch = generator.predict(lr_batch, verbose=0)
        
        for j in range(len(hr_batch)):
            metrics['psnr'].append(psnr(hr_batch[j], sr_batch[j], data_range=1.0))
            metrics['ssim'].append(ssim(hr_batch[j], sr_batch[j], 
                                      data_range=1.0, channel_axis=-1))
            metrics['mse'].append(np.mean((hr_batch[j] - sr_batch[j]) ** 2))
        
        metrics['lr_samples'].extend(lr_batch)
        metrics['hr_samples'].extend(hr_batch)
        metrics['sr_samples'].extend(sr_batch)
    
    return metrics

# ====================== ANALYSIS & VISUALIZATION ======================
def plot_esrgan_samples(esrgan_metrics, num_samples=10):
    """Visual comparison with error heatmaps (ESRGAN version)"""
    indices = np.random.choice(len(esrgan_metrics['hr_samples']), num_samples)
    
    for idx in indices:
        plt.figure(figsize=(18, 4))
        
        # LR Input
        plt.subplot(141)
        plt.imshow(esrgan_metrics['lr_samples'][idx])
        plt.title('Medium-Resolution Input')
        plt.axis('off')
        
        # HR Ground Truth
        plt.subplot(142)
        plt.imshow(esrgan_metrics['hr_samples'][idx])
        plt.title('High-Resolution Target')
        plt.axis('off')
        
        # SR Output
        plt.subplot(143)
        plt.imshow(esrgan_metrics['sr_samples'][idx])
        metrics_text = f"PSNR: {esrgan_metrics['psnr'][idx]:.2f}\n" \
                       f"SSIM: {esrgan_metrics['ssim'][idx]:.4f}\n" \
                       f"MSE: {esrgan_metrics['mse'][idx]:.5f}"
        plt.title('ESRGAN Output\n' + metrics_text)
        plt.axis('off')
        
        # Error Map
        plt.subplot(144)
        error = np.abs(esrgan_metrics['hr_samples'][idx] - esrgan_metrics['sr_samples'][idx])
        plt.imshow(np.mean(error, axis=-1), cmap='inferno')
        plt.title('Pixel Error Heatmap')
        plt.colorbar()
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()

# ====================== EXECUTION FLOW ======================
if __name__ == "__main__":
    # 1. Load new evaluation dataset (corrected)
    print("Loading new evaluation dataset...")
    new_lr, new_hr = load_dataset(DATASET_PATHS['new_dataset']['lr'],
                                   DATASET_PATHS['new_dataset']['hr'])
    
    # 2. Evaluate ESRGAN model on dataset
    print("\nEvaluating on new dataset...")
    esrgan_metrics = evaluate_model(generator, new_lr, new_hr)
    
    # 3. Visual comparison with ESRGAN
    print("\nGenerating ESRGAN visual comparisons...")
    plot_esrgan_samples(esrgan_metrics)
    
    # 4. Save results
    esrgan_metrics_df = pd.DataFrame({
        'PSNR': esrgan_metrics['psnr'],
        'SSIM': esrgan_metrics['ssim'],
        'MSE': esrgan_metrics['mse']
    })
    esrgan_metrics_df.to_csv('esrgan_comparison.csv', index=False)
    print("\nComparison saved to esrgan_comparison.csv")
