In [2]:
import tensorflow as tf
from tensorflow.keras import layers, Model
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [3]:
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import VGG19

# Define U-Net Generator with Additional Skip Connections
def unet_generator(output_channels):
    inputs = layers.Input(shape=[256, 256, 3])

    # Downsampling layers (Encoder)
    down_stack = [
        layers.Conv2D(64, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2D(128, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2D(256, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2D(512, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2D(512, 4, strides=2, padding='same', activation='relu')
    ]

    # Upsampling layers (Decoder)
    up_stack = [
        layers.Conv2DTranspose(512, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2DTranspose(256, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2DTranspose(128, 4, strides=2, padding='same', activation='relu'),
        layers.Conv2DTranspose(64, 4, strides=2, padding='same', activation='relu'),
    ]

    # Connecting layers with skip connections
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)

    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = layers.Concatenate()([x, skip])

    # Final output layer
    outputs = layers.Conv2DTranspose(output_channels, 4, strides=2, padding='same', activation='tanh')(x)
    return Model(inputs=inputs, outputs=outputs)

generator = unet_generator(3)
generator.summary()

In [4]:
# Define PatchGAN Discriminator
def patchgan_discriminator():
    model = tf.keras.Sequential()
    model.add(layers.InputLayer(input_shape=[256, 256, 3]))  # Adjust input shape
    model.add(layers.Conv2D(64, 4, strides=2, padding='same', activation='relu'))
    model.add(layers.Conv2D(128, 4, strides=2, padding='same', activation='relu'))
    model.add(layers.Conv2D(256, 4, strides=2, padding='same', activation='relu'))
    model.add(layers.Conv2D(1, 4, strides=1, padding='same', activation='sigmoid'))  # Binary classification (real or fake)
    return model

# Initialize discriminator
discriminator = patchgan_discriminator()
discriminator.summary()



In [5]:
# Define VGG-based Perceptual Loss
class VGGPerceptualLoss(tf.keras.Model):
    def __init__(self):
        super(VGGPerceptualLoss, self).__init__()
        vgg = VGG19(include_top=False, weights='imagenet')
        vgg.trainable = False
        self.model = Model(inputs=vgg.input, outputs=vgg.get_layer('block4_conv1').output)

    def call(self, y_true, y_pred):
        y_true_vgg = self.model(y_true)
        y_pred_vgg = self.model(y_pred)
        return tf.reduce_mean(tf.abs(y_true_vgg - y_pred_vgg))

# Loss functions
def generator_loss(disc_generated_output, gen_output, target, perceptual_loss_model):
    # L1 loss
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    # Adversarial loss
    gan_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=disc_generated_output, labels=tf.ones_like(disc_generated_output)))

    # Perceptual loss
    perceptual_loss = perceptual_loss_model(target, gen_output)

    # Weighted loss combination
    return gan_loss + 50 * l1_loss + 10 * perceptual_loss

def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=disc_real_output, labels=tf.ones_like(disc_real_output)))
    generated_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        logits=disc_generated_output, labels=tf.zeros_like(disc_generated_output)))
    return real_loss + generated_loss

In [5]:
### load the dataset

In [6]:
# Modify to load SAR and RGB images from separate directories
def load_images_from_directory(sar_dir, rgb_dir, image_size=(256, 256)):
    sar_images = []
    rgb_images = []

    # List all files in the SAR and RGB directories
    sar_files = os.listdir(sar_dir)
    rgb_files = os.listdir(rgb_dir)

    # Sort the files so they match correctly
    sar_files.sort()
    rgb_files.sort()

    for sar_file, rgb_file in zip(sar_files, rgb_files):
        if sar_file.endswith(".png") and rgb_file.endswith(".png"):
            # Load and preprocess SAR image (grayscale)
            sar_image = load_img(os.path.join(sar_dir, sar_file), target_size=image_size, color_mode='grayscale')
            sar_image = img_to_array(sar_image)
            sar_image = np.repeat(sar_image, 3, axis=-1)  # Convert grayscale to RGB by duplicating the channel
            sar_image = (sar_image - 127.5) / 127.5  # Normalize to [-1, 1]
            sar_images.append(sar_image)

            # Load and preprocess RGB image (color)
            rgb_image = load_img(os.path.join(rgb_dir, rgb_file), target_size=image_size)
            rgb_image = img_to_array(rgb_image)
            rgb_image = (rgb_image - 127.5) / 127.5  # Normalize to [-1, 1]
            rgb_images.append(rgb_image)

    return np.array(sar_images), np.array(rgb_images)

# Replace with the correct paths to your directories
sar_dir = "/kaggle/input/sentinel12-image-pairs-segregated-by-terrain/v_2/agri/s1"
rgb_dir = "/kaggle/input/sentinel12-image-pairs-segregated-by-terrain/v_2/agri/s2"

# Load the images
sar_images, rgb_images = load_images_from_directory(sar_dir, rgb_dir, image_size=(256, 256))

# Check the shape of loaded images
print(sar_images.shape, rgb_images.shape)


(4000, 256, 256, 3) (4000, 256, 256, 3)


In [7]:
from sklearn.model_selection import train_test_split

# Assuming sar_images and rgb_images are loaded NumPy arrays
sar_train, sar_temp, rgb_train, rgb_temp = train_test_split(sar_images, rgb_images, test_size=0.3, random_state=42)
sar_val, sar_test, rgb_val, rgb_test = train_test_split(sar_temp, rgb_temp, test_size=0.5, random_state=42)

# Create tf.data.Datasets
def create_dataset(sar_images, rgb_images, batch_size=16):
    dataset = tf.data.Dataset.from_tensor_slices((sar_images, rgb_images))
    dataset = dataset.shuffle(buffer_size=len(sar_images))
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
    return dataset

train_dataset = create_dataset(sar_train, rgb_train, batch_size=8)
val_dataset = create_dataset(sar_val, rgb_val, batch_size=8)
test_dataset = create_dataset(sar_test, rgb_test, batch_size=16)


In [8]:
@tf.function
def train_step(input_image, target, perceptual_loss_model, generator_optimizer, discriminator_optimizer):
    with tf.GradientTape(persistent=True) as tape:
        gen_output = generator(input_image, training=True)
        disc_real_output = discriminator(target, training=True)
        disc_generated_output = discriminator(gen_output, training=True)

        gen_loss = generator_loss(disc_generated_output, gen_output, target, perceptual_loss_model)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    gradients_of_generator = tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = tape.gradient(disc_loss, discriminator.trainable_variables)

    # Apply gradients
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    del tape  # Free memory for the tape

    return gen_loss, disc_loss


In [None]:
# Define Adam optimizers
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)

def train(train_dataset, val_dataset, epochs):
    # Instantiate perceptual loss model
    perceptual_loss_model = VGGPerceptualLoss()

    # Define Adam optimizers
    generator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
    discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)

    for epoch in range(epochs):
        print(f"Starting Epoch {epoch + 1}/{epochs}")
        for input_image, target in train_dataset:
            gen_loss, disc_loss = train_step(input_image, target, perceptual_loss_model, generator_optimizer, discriminator_optimizer)

        print(f"Epoch {epoch + 1}: Gen Loss: {gen_loss}, Disc Loss: {disc_loss}")

        # Evaluate on validation data
        val_gen_loss = 0
        val_disc_loss = 0
        val_batches = 0
        for val_input_image, val_target in val_dataset:
            gen_output = generator(val_input_image, training=False)
            disc_real_output = discriminator(val_target, training=False)
            disc_generated_output = discriminator(gen_output, training=False)

            val_gen_loss += generator_loss(disc_generated_output, gen_output, val_target, perceptual_loss_model)
            val_disc_loss += discriminator_loss(disc_real_output, disc_generated_output)
            val_batches += 1

        val_gen_loss /= val_batches
        val_disc_loss /= val_batches

        print(f"Validation: Gen Loss: {val_gen_loss}, Disc Loss: {val_disc_loss}")

        
train(train_dataset, val_dataset, epochs=20)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg19/vgg19_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m80134624/80134624[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Starting Epoch 1/20
Epoch 1: Gen Loss: 98.44144439697266, Disc Loss: 1.1684757471084595
Validation: Gen Loss: 93.24346923828125, Disc Loss: 1.1858350038528442
Starting Epoch 2/20
Epoch 2: Gen Loss: 88.62085723876953, Disc Loss: 1.009641170501709
Validation: Gen Loss: 88.06364440917969, Disc Loss: 1.0201491117477417
Starting Epoch 3/20
Epoch 3: Gen Loss: 91.28565979003906, Disc Loss: 1.00897216796875
Validation: Gen Loss: 85.98218536376953, Disc Loss: 1.0085806846618652
Starting Epoch 4/20
Epoch 4: Gen Loss: 85.98098754882812, Disc Loss: 1.0071009397506714
Validation: Gen Loss: 82.6901626586914, Disc Loss: 1.0084325075149536
Starting Epoch 5/20
Epoch 5: Gen Loss: 80.69319152832031, Disc Loss: 1.0073978900909424
Validation: Gen Loss: 80.296142578125, Disc Loss: 1.0125

In [24]:
generator.save("pix2pix_generator_final.h5")
discriminator.save("pix2pix_discriminator_final.h5")

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import random
from skimage.metrics import structural_similarity as ssim
import tensorflow as tf

# Visualize generated images for random test samples and calculate SSIM, PSNR
def visualize_results(test_input, test_target, gen_output, epoch=None):
    # Rescale back to [0, 1] for visualization
    test_input = (test_input[0] * 0.5 + 0.5).numpy()
    test_target = (test_target[0] * 0.5 + 0.5).numpy()
    gen_output = (gen_output[0] * 0.5 + 0.5).numpy()
    
    # Calculate SSIM and PSNR
    ssim_value = ssim(test_target, gen_output, multichannel=True)
    psnr_value = tf.image.psnr(test_target, gen_output, max_val=1.0).numpy()

    # Print SSIM and PSNR
    print(f"SSIM: {ssim_value:.4f}, PSNR: {psnr_value:.4f}")
    
    # Plotting images
    plt.figure(figsize=(12, 12))

    # SAR image
    plt.subplot(1, 3, 1)
    plt.imshow(test_input)
    plt.title("Input (SAR)")
    plt.axis("off")

    # True RGB image
    plt.subplot(1, 3, 2)
    plt.imshow(test_target)
    plt.title("True RGB")
    plt.axis("off")

    # Generated RGB image
    plt.subplot(1, 3, 3)
    plt.imshow(gen_output)
    plt.title("Generated Image")
    plt.axis("off")

    if epoch is not None:
        plt.suptitle(f"Epoch {epoch + 1}")
    plt.show()

# Function to test and visualize random samples from the test dataset
def test_on_random_samples(test_dataset, generator, num_samples=5):
    # Select `num_samples` random batches from the test dataset
    random_indices = random.sample(range(len(test_dataset)), num_samples)
    
    for idx in random_indices:
        test_input, test_target = list(test_dataset.skip(idx).take(1))[0]
        
        # Generate the image
        gen_output = generator(test_input, training=False)
        
        # Visualize Results with SSIM and PSNR
        visualize_results(test_input, test_target, gen_output)

# After training completes, visualize results for 5 random test samples
test_on_random_samples(test_dataset, generator, num_samples=5)