In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

#  Import Libraries

In [1]:
import tensorflow as tf
from tensorflow.keras.layers import (Input, Conv2D, MaxPooling2D, Dense, Dropout, Add, 
                                     MultiHeadAttention, LayerNormalization, GlobalAveragePooling2D)
from tensorflow.keras import Model
import numpy as np
from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim


# Image Preprocessing

In [2]:
def load_images_from_folder(folder, size=(128, 128)):
    images = []
    for filename in glob.glob(os.path.join(folder, '*.png')):
        img = imread(filename)
        img = img_as_float(img)
        img_resized = tf.image.resize(img, size).numpy()
        images.append(img_resized)
    return np.array(images)

def load_and_preprocess_datasets(base_folder):
    rainy_images = []
    clear_images = []
    for i in range(6):  # Assuming 6 datasets
        dataset_folder = os.path.join(base_folder, f'dataset_{i}')
        rainy_folder = os.path.join(dataset_folder, 'rainy')
        clear_folder = os.path.join(dataset_folder, 'clear')
        
        rainy_images.extend(load_images_from_folder(rainy_folder))
        clear_images.extend(load_images_from_folder(clear_folder))
        
    return np.array(rainy_images), np.array(clear_images)

base_folder = 'path_to_your_dataset_folder'
rainy_images, clear_images = load_and_preprocess_datasets(base_folder)


FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/rainydata/RainStreak/rain_streak'

# Advanced Dynamic Pyramid Model

In [4]:
# Notebook 2: Advanced Dynamic Pyramid Model

import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Add

def advanced_dynamic_pyramid_model(input_shape, num_scales=3):
    inputs = Input(shape=input_shape)
    pyramid_features = []
    
    for scale in range(num_scales):
        scale_factor = 2 ** scale
        x_scaled = tf.image.resize(inputs, [input_shape[0] // scale_factor, input_shape[1] // scale_factor])
        
        # Dynamic kernel size based on scale
        kernel_size = 3 + scale
        x_conv = Conv2D(64 * scale_factor, (kernel_size, kernel_size), activation='relu', padding='same')(x_scaled)
        x_conv = Add()([x_conv, x_scaled])  # Residual connection
        pyramid_features.append(x_conv)

    fused_features = tf.concat(pyramid_features, axis=-1)  # Concatenate along channel dimension
    return inputs, fused_features

# Test the dynamic pyramid model
input_shape = (256, 256, 3)
inputs, fused_features = advanced_dynamic_pyramid_model(input_shape)
print("Input shape:", inputs.shape)
print("Fused features shape:", fused_features.shape)


# Window Partition and Reverse Functions

In [None]:
# Notebook 3: Window Partition and Reverse Functions

import tensorflow as tf

def window_partition(x, window_size):
    patches = tf.image.extract_patches(images=x,
                                       sizes=[1, window_size, window_size, 1],
                                       strides=[1, window_size, window_size, 1],
                                       rates=[1, 1, 1, 1],
                                       padding='VALID')
    return patches

def window_reverse(patches, window_size, input_shape):
    return tf.reshape(patches, input_shape)

# Test window functions
input_shape = (256, 256, 3)
x = tf.random.uniform(input_shape)
window_size = 4
patches = window_partition(x, window_size)
x_reconstructed = window_reverse(patches, window_size, input_shape)
print("Original shape:", x.shape)
print("Reconstructed shape:", x_reconstructed.shape)


# Shifted Window Partition Function

In [None]:
# Notebook 4: Shifted Window Partition Function

import tensorflow as tf

def shifted_window_partition(x, window_size):
    x_shifted = tf.roll(x, shift=window_size // 2, axis=[1, 2])
    patches = window_partition(x_shifted, window_size)
    return patches

# Test shifted window partition function
input_shape = (256, 256, 3)
x = tf.random.uniform(input_shape)
window_size = 4
shifted_patches = shifted_window_partition(x, window_size)
print("Shifted patches shape:", shifted_patches.shape)


# Recursive Block with Hierarchical Patch Merging

In [None]:
# Notebook 5: Recursive Block with Hierarchical Patch Merging

import tensorflow as tf
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Dense, Add

def recursive_block(x, iteration, window_size, patch_size, num_heads, key_dim, num_iterations):
    if iteration == 0:
        return x

    windows = window_partition(x, window_size)
    window_shape = (windows.shape[-2], windows.shape[-1])
    windows_reshaped = tf.reshape(windows, (-1, window_shape[0] * window_shape[1], x.shape[-1]))

    x_norm = LayerNormalization()(windows_reshaped)
    attention = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(x_norm, x_norm)

    attention_scores = tf.reduce_mean(tf.abs(attention), axis=-1, keepdims=True)
    threshold = 0.1
    attention = tf.where(attention_scores > threshold, attention, tf.zeros_like(attention))

    x_add = Add()([windows_reshaped, attention])
    x_reconstructed = window_reverse(x_add, window_size, x.shape)

    x_norm_ffn = LayerNormalization()(x_reconstructed)
    x_ffn = Dense(128, activation='relu')(x_norm_ffn)
    x_ffn_out = Dense(x.shape[-1])(x_ffn)

    x_out = Add()([x_reconstructed, x_ffn_out])

    if iteration > 1:
        x_out = hierarchical_patch_merging(x_out, window_size, patch_size)

    return recursive_block(x_out, iteration - 1, window_size, patch_size, num_heads, key_dim, num_iterations)

def hierarchical_patch_merging(x, window_size, patch_size):
    num_patches = patch_size // window_size
    new_window_size = window_size * 2
    new_patch_size = patch_size * 2

    x_reshaped = tf.reshape(x, (-1, x.shape[1] // num_patches, num_patches, x.shape[-1]))
    x_merged = tf.reduce_mean(x_reshaped, axis=2)
    x_merged = tf.reshape(x_merged, (-1, new_patch_size, new_patch_size, x.shape[-1]))
    
    return x_merged

# Test recursive block with hierarchical patch merging
input_shape = (256, 256, 64)
x = tf.random.uniform(input_shape)
window_size = 4
patch_size = 4
num_heads = 4
key_dim = 64
num_iterations = 3
output = recursive_block(x, num_iterations, window_size, patch_size, num_heads, key_dim, num_iterations)
print("Output shape:", output.shape)


# Modified Swin Transformer Block

In [5]:
# Notebook 6: Modified Swin Transformer Block

import tensorflow as tf
from tensorflow.keras.layers import LayerNormalization, MultiHeadAttention, Dense, Add, UpSampling2D

def modified_swin_transformer_block(fused_features, window_size=4, num_heads=4, key_dim=64, num_iterations=3):
    x_out = recursive_block(fused_features, num_iterations, window_size, window_size, num_heads, key_dim, num_iterations)
    
    shifted_windows = shifted_window_partition(x_out, window_size)
    shifted_windows_reshaped = tf.reshape(shifted_windows, (-1, window_size * window_size, x_out.shape[-1]))
    shifted_x_norm = LayerNormalization()(shifted_windows_reshaped)
    shifted_attention = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(shifted_x_norm, shifted_x_norm)
    
    shifted_x_add = Add()([shifted_windows_reshaped, shifted_attention])
    shifted_x_reconstructed = window_reverse(shifted_x_add, window_size, x_out.shape)
    
    shifted_x_norm_ffn = LayerNormalization()(shifted_x_reconstructed)
    shifted_x_ffn = Dense(128, activation='relu')(shifted_x_norm_ffn)
    shifted_x_ffn_out = Dense(x_out.shape[-1])(shifted_x_ffn)
    
    x_final = Add()([shifted_x_reconstructed, shifted_x_ffn_out])

    return x_final

# Test modified Swin Transformer block
input_shape = (256, 256, 64)
x = tf.random.uniform(input_shape)
window_size = 4
num_heads = 4
key_dim = 64
num_iterations = 6
x_out = modified_swin_transformer_block(x, window_size, num_heads, key_dim, num_iterations)
print("Final output shape:", x_out.shape)


# Image Restoration and Enhancement

In [6]:
# Notebook 7: Image Restoration and Enhancement

import tensorflow as tf
from tensorflow.keras.layers import LayerNormalization, UpSampling2D, Conv2D, Dense

def image_restoration_and_enhancement(x_out, num_classes=3):
    x = LayerNormalization()(x_out)

    x = UpSampling2D()(x)
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = UpSampling2D()(x)
    x = Conv2D(num_classes, (3, 3), padding='same')(x)

    return x

# Test image restoration and enhancement
input_shape = (256, 256, 64)
x_out = tf.random.uniform(input_shape)
restored_image = image_restoration_and_enhancement(x_out)
print("Restored image shape:", restored_image.shape)


# Model Construction

In [7]:
# Notebook 8: Create and Summarize Model

import tensorflow as tf
from tensorflow.keras.layers import Input

def create_model(input_shape):
    inputs, fused_features = advanced_dynamic_pyramid_model(input_shape)
    x_out = modified_swin_transformer_block(fused_features)
    outputs = image_restoration_and_enhancement(x_out)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model

# Example usage
input_shape = (256, 256, 3)
model = create_model(input_shape)
model.summary()


ValueError: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


# Compile the Model

In [None]:
# Cell 1: Model Compilation

import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

# Compile the model
def compile_model(model):
    """
    Compile the model with an optimizer and loss function suitable for image restoration.
    """
    optimizer = Adam(learning_rate=0.0001)
    loss = MeanSquaredError()  # Use MSE for image restoration
    model.compile(optimizer=optimizer, loss=loss, metrics=['mae'])
    return model

# Assuming you have your model defined
model = compile_model(model)
model.summary()  # Optional: Show the model architecture summary


# Model Training

In [None]:
# Cell 2: Model Training

def train_model(model, train_data, val_data, batch_size=16, epochs=50):
    """
    Train the model on training data with validation data for evaluation.
    """
    history = model.fit(train_data, 
                        validation_data=val_data, 
                        epochs=epochs, 
                        batch_size=batch_size)
    return history

# Assuming train_data and val_data are prepared
history = train_model(model, train_data, val_data, batch_size=16, epochs=50)


# Model Evaluation

In [None]:
# Cell 3: Model Evaluation

def evaluate_model(model, test_data):
    """
    Evaluate the model on the test data.
    """
    results = model.evaluate(test_data)
    print(f"Test Loss: {results[0]}, Test MAE: {results[1]}")
    return results

# Assuming test_data is prepared
evaluate_model(model, test_data)


# PSNR and SSIM Calculation

In [None]:
# Cell 4: PSNR and SSIM Calculation

from skimage.metrics import peak_signal_noise_ratio as psnr, structural_similarity as ssim
import numpy as np

# PSNR and SSIM evaluation
def calculate_psnr_ssim(true_image, restored_image):
    """
    Calculate PSNR and SSIM between the ground truth and restored images.
    """
    psnr_value = psnr(true_image, restored_image, data_range=true_image.max() - true_image.min())
    ssim_value = ssim(true_image, restored_image, multichannel=True)
    return psnr_value, ssim_value


# Prediction and Evaluation on Test Images

In [None]:
# Cell 5: Model Prediction and Evaluation on Test Images

import matplotlib.pyplot as plt

def predict_and_evaluate(model, test_images, ground_truth_images):
    """
    Predict restored images using the model and evaluate PSNR and SSIM.
    """
    for i, test_image in enumerate(test_images):
        # Make predictions on the test image
        restored_image = model.predict(np.expand_dims(test_image, axis=0))[0]
        
        # Rescale the restored image from [-1, 1] to [0, 1] (if necessary)
        restored_image = (restored_image + 1) / 2
        
        # Calculate PSNR and SSIM
        psnr_value, ssim_value = calculate_psnr_ssim(ground_truth_images[i], restored_image)
        print(f"Image {i+1}: PSNR = {psnr_value:.2f}, SSIM = {ssim_value:.4f}")
        
        # Plot the ground truth and restored image for visual comparison
        plt.figure(figsize=(10, 5))
        plt.subplot(1, 2, 1)
        plt.title("Ground Truth")
        plt.imshow(ground_truth_images[i])
        plt.axis('off')
        
        plt.subplot(1, 2, 2)
        plt.title("Restored Image")
        plt.imshow(restored_image)
        plt.axis('off')
        
        plt.show()

# Assuming test_images and ground_truth_images are prepared
test_images = []  # Add test images here
ground_truth_images = []  # Add ground truth images here
predict_and_evaluate(model, test_images, ground_truth_images)


# PSNR and SSIM Evaluation Across Test Set

In [None]:
# Cell 6: Average PSNR and SSIM for the entire test set

def evaluate_psnr_ssim_on_testset(model, test_images, ground_truth_images):
    """
    Calculate average PSNR and SSIM for the entire test set.
    """
    psnr_values = []
    ssim_values = []

    for i, test_image in enumerate(test_images):
        restored_image = model.predict(np.expand_dims(test_image, axis=0))[0]
        restored_image = (restored_image + 1) / 2  # Rescale if necessary

        psnr_value, ssim_value = calculate_psnr_ssim(ground_truth_images[i], restored_image)
        psnr_values.append(psnr_value)
        ssim_values.append(ssim_value)

    avg_psnr = np.mean(psnr_values)
    avg_ssim = np.mean(ssim_values)

    print(f"Average PSNR on Test Set: {avg_psnr:.2f}")
    print(f"Average SSIM on Test Set: {avg_ssim:.4f}")

    return avg_psnr, avg_ssim

# Assuming test_images and ground_truth_images are prepared
avg_psnr, avg_ssim = evaluate_psnr_ssim_on_testset(model, test_images, ground_truth_images)


# Visualization of Training History

In [None]:
# Cell 7: Visualize the training and validation loss

def plot_training_history(history):
    """
    Plot the training and validation loss curves.
    """
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Visualize the training history
plot_training_history(history)
