In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/task2subtask1/Test/Reference/4_img_.png
/kaggle/input/task2subtask1/Test/Reference/537_img_.png
/kaggle/input/task2subtask1/Test/Reference/722_img_.png
/kaggle/input/task2subtask1/Test/Reference/366_img_.png
/kaggle/input/task2subtask1/Test/Reference/432_img_.png
/kaggle/input/task2subtask1/Test/Reference/381_img_.png
/kaggle/input/task2subtask1/Test/Reference/207_img_.png
/kaggle/input/task2subtask1/Test/Reference/90_img_.png
/kaggle/input/task2subtask1/Test/Reference/98_img_.png
/kaggle/input/task2subtask1/Test/Reference/270_img_.png
/kaggle/input/task2subtask1/Test/Reference/589_img_.png
/kaggle/input/task2subtask1/Test/Reference/292_img_.png
/kaggle/input/task2subtask1/Test/Reference/18_img_.png
/kaggle/input/task2subtask1/Test/Reference/373_img_.png
/kaggle/input/task2subtask1/Test/Reference/887_img_.png
/kaggle/input/task2subtask1/Test/Reference/3650.png
/kaggle/input/task2subtask1/Test/Reference/651.png
/kaggle/input/task2subtask1/Test/Reference/360_img_.png
/kaggl

In [2]:
#Importing All Libraries
from IPython import display
import os

import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import PIL
import tensorflow as tf
import cv2

import tensorflow_probability as tfp
import time
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from skimage.metrics import structural_similarity as ssim
import matplotlib.pyplot as plt


In [None]:
# VAE class definition
class VAE(Model):
    def __init__(self, image_channels):
        super(VAE, self).__init__()
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(8, kernel_size=3, strides=2, padding='same', activation='relu', input_shape=(256, 384, image_channels)),
            layers.Conv2D(16, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Flatten()
        ])
        
        self.mu = layers.Dense(128)  # Adjusted latent dimension
        self.logvar = layers.Dense(128)  # Adjusted latent dimension
        
        self.decoder = tf.keras.Sequential([
            layers.Dense(16 * 64 * 96, activation='relu'),  # Adjusted for reshaping
            layers.Reshape((64, 96, 16)),  # Adjusted dimensions
            layers.Conv2DTranspose(16, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv2DTranspose(8, kernel_size=3, strides=2, padding='same', activation='relu'),
            layers.Conv2DTranspose(image_channels, kernel_size=3, strides=1, padding='same', activation='sigmoid')
        ])
        
    def reparam(self, mu, logvar):
        std = tf.exp(0.5 * logvar)
        eps = tf.random.normal(shape=tf.shape(std))
        return mu + eps * std
    
    def call(self, x):
        x = self.encoder(x)
        
        mu = self.mu(x)
        logvar = self.logvar(x)
        
        z = self.reparam(mu, logvar)
        
        out_image = self.decoder(z)
        return out_image, mu, logvar

In [None]:
# Define loss function
def loss_f(out, inp, mu, logvar):
    mse = MeanSquaredError()(inp, out)
    kl_div = -0.5 * tf.reduce_mean(1 + logvar - tf.square(mu) - tf.exp(logvar))
    return mse + kl_div

In [None]:
# Function to load images from a directory
def load_images_from_directory(directory, target_size):
    image_paths = [os.path.join(directory, fname) for fname in os.listdir(directory)]
    images = []
    for image_path in image_paths:
        img = load_img(image_path, target_size=target_size)
        img = img_to_array(img) / 255.0  # Normalize
        images.append(img)
    return np.array(images)

# Load raw and reference images for training
train_raw_path = "/kaggle/input/task2subtask1/Train/Raw"
train_ref_path = "/kaggle/input/task2subtask1/Train/Reference"

train_raw_images = load_images_from_directory(train_raw_path, target_size=(256, 384))
train_ref_images = load_images_from_directory(train_ref_path, target_size=(256, 384))

# Load raw and reference images for testing
test_raw_path = "/kaggle/input/task2subtask1/Test/Raw"
test_ref_path = "/kaggle/input/task2subtask1/Test/Reference"

test_raw_images = load_images_from_directory(test_raw_path, target_size=(256, 384))
test_ref_images = load_images_from_directory(test_ref_path, target_size=(256, 384))

In [None]:
# Set hyperparameters
image_channels = 3
learning_rate = 1e-3
epochs = 10
batch_size = 1

# Create the model and optimizer
model = VAE(image_channels)
optimizer = Adam(learning_rate)

In [6]:
for epoch in range(epochs):
    total_loss = 0
    for i in range(len(train_raw_images)):
        with tf.GradientTape() as tape:
            output_img, mu, logvar = model(tf.expand_dims(train_raw_images[i], axis=0))
            loss = loss_f(output_img, tf.expand_dims(train_ref_images[i], axis=0), mu, logvar)
        
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        total_loss += loss.numpy()
    
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(train_raw_images):.4f}')

Epoch [1/10], Loss: 0.0707
Epoch [2/10], Loss: 0.0624
Epoch [3/10], Loss: 0.0611
Epoch [4/10], Loss: 0.0604
Epoch [5/10], Loss: 0.0604
Epoch [6/10], Loss: 0.0603
Epoch [7/10], Loss: 0.0600
Epoch [8/10], Loss: 0.0600
Epoch [9/10], Loss: 0.0598
Epoch [10/10], Loss: 0.0601


In [7]:
import numpy as np
import tensorflow as tf
from skimage.metrics import structural_similarity as ssim
from sklearn.metrics import mean_squared_error


# Assume model, test_raw_images, and test_ref_images are already defined

# Save the model weights
model.save_weights("vae1_tf.weights.h5")

# Load the model weights
model.load_weights("vae1_tf.weights.h5")

# Function to calculate SSIM with adaptive window size
def calculate_ssim(img1, img2):
    min_dim = min(img1.shape[0], img1.shape[1])
    
    # Set win_size to the largest possible odd number <= min_dim
    win_size = min_dim if min_dim % 2 != 0 else min_dim - 1
    
    # If win_size is 1, we can't calculate SSIM meaningfully
    if win_size < 3:
        return np.nan
    
    return ssim(img1, img2, win_size=win_size, multichannel=True, data_range=1, channel_axis=-1)

# Function to calculate PSNR
def calculate_psnr(img1, img2):
    mse = mean_squared_error(img1.flatten(), img2.flatten())
    if mse == 0:
        return float('inf')  # Infinite PSNR for identical images
    max_pixel = 1.0  # Assuming the images are normalized between 0 and 1
    return 20 * np.log10(max_pixel / np.sqrt(mse))

# Evaluate on test data
ssim_scores = []
mse_scores = []
psnr_scores = []

for i in range(len(test_raw_images)):
    test_img = tf.expand_dims(test_raw_images[i], axis=0)
    ref_img = test_ref_images[i]
    
    # Run through the model
    reconstructed_img, _, _ = model(test_img)
    
    # Reshape output image
    reconstructed_img = tf.squeeze(reconstructed_img, axis=0).numpy()
    
    # Calculate SSIM with adaptive window size
    ssim_value = calculate_ssim(reconstructed_img, ref_img)
    if not np.isnan(ssim_value):
        ssim_scores.append(ssim_value)
    
    # Calculate MSE
    mse_value = mean_squared_error(ref_img.flatten(), reconstructed_img.flatten())
    mse_scores.append(mse_value)
    
    # Calculate PSNR
    psnr_value = calculate_psnr(ref_img, reconstructed_img)
    psnr_scores.append(psnr_value)

# Calculate average scores across all valid test images
if ssim_scores:
    average_ssim = np.mean(ssim_scores)
    print(f"Average SSIM: {average_ssim:.4f}")
else:
    print("No valid SSIM scores were calculated.")

if mse_scores:
    average_mse = np.mean(mse_scores)
    print(f"Average MSE: {average_mse:.4f}")
else:
    print("No valid MSE scores were calculated.")

if psnr_scores:
    average_psnr = np.mean(psnr_scores)
    print(f"Average PSNR: {average_psnr:.4f} dB")
else:
    print("No valid PSNR scores were calculated.")

# Summary of valid scores
print(f"Number of images used for SSIM calculation: {len(ssim_scores)}")
print(f"Number of images used for MSE calculation: {len(mse_scores)}")
print(f"Number of images used for PSNR calculation: {len(psnr_scores)}")


Average SSIM: 0.1058
Average MSE: 0.0605
Average PSNR: 12.4251 dB
Number of images used for SSIM calculation: 190
Number of images used for MSE calculation: 190
Number of images used for PSNR calculation: 190
