# Image-to-Image Translation for Subsurface Reservoir Characterization

This project implements a Conditional Generative Adversarial Network (cGAN) to perform image-to-image translation for subsurface reservoir characterization. The model predicts properties such as porosity, permeability, and water saturation from input facies images, adapting the architecture and principles of the original Pix2Pix cGAN.

Pix2Pix is a versatile GAN model designed for general-purpose image-to-image translation tasks. It was introduced by Phillip Isola et al. in their 2016 paper, ["Image-to-Image Translation with Conditional Adversarial Networks"](https://arxiv.org/abs/1611.07004), which was later presented at CVPR in 2017. You can find more details about the original approach on the [Pix2Pix project page](https://phillipi.github.io/pix2pix/).

While the original Pix2Pix was demonstrated on tasks such as generating building facades, colorizing images, and transforming sketches into photos, this project tailors it specifically for geological datasets, enhancing the model's applicability to subsurface reservoir characterization. 

### Network Architecture:
- **Generator**: A [U-Net](https://arxiv.org/abs/1505.04597)-based architecture to produce property maps (e.g., porosity, permeability) from facies images.
- **Discriminator**: A convolutional PatchGAN classifier to determine whether the generated property maps are realistic.

### Examples of Results:
Below are some examples of the outputs generated by the model:
![sample output_1](Examples of Results/Mask facies.png)
![sample output_2](Examples of Results/Mask Sw.png)
![sample output_3](Examples of Results/Fc to Pors .png)
![sample output_4](Examples of Results/Pors to FC.png)
![sample output_5](Examples of Results/FC to PERM .png)


## Import TensorFlow and other libraries

In [None]:
import tensorflow as tf

import os
import pathlib
import time
import datetime

from matplotlib import pyplot as plt
from IPython import display

## Load the dataset

Download the CMP Facade Database data (30MB). Additional datasets are available in the same format [here](http://efrosgans.eecs.berkeley.edu/pix2pix/datasets/). In Colab you can select other datasets from the drop-down menu. Note that some of the other datasets are significantly larger (`edges2handbags` is 8GB in size). 

In [None]:
import zipfile
import pathlib

# Define the dataset name
dataset_name = 'Sw vs Fc_dataset_split'  # or any other name you want to use

# Path to the local zip file
local_zip_path = '/home/g202103050/Documents/A.Rahman/2-3D GAN part/2D/2D_New Eara/Facies to por or perm or Sw/Sw vs Fc_dataset_split.zip'

# Extract the zip file
with zipfile.ZipFile(local_zip_path, 'r') as zip_ref:
    zip_ref.extractall(path=pathlib.Path('//home/g202103050/Documents/A.Rahman/2-3D GAN part/2D/2D_New Eara/Facies to por or perm or Sw/'))

# Define the path to the extracted dataset
PATH = pathlib.Path('/home/g202103050/Documents/A.Rahman/2-3D GAN part/2D/2D_New Eara/Facies to por or perm or Sw//') / dataset_name
print(f"Extracted dataset to: {PATH}")


In [None]:
list(PATH.parent.iterdir())

Each original image is of size `256 x 512` containing two `256 x 256` images:

In [None]:
sample_image = tf.io.read_file(str(PATH / 'train/1.png'))
sample_image = tf.io.decode_jpeg(sample_image)
print(sample_image.shape)

In [None]:
plt.figure()
plt.imshow(sample_image)

You need to separate real building facade images from the architecture label images—all of which will be of size `256 x 256`.

Define a function that loads image files and outputs two image tensors:

In [None]:
def load(image_file):
  # Read and decode an image file to a uint8 tensor
  image = tf.io.read_file(image_file)
  image = tf.io.decode_jpeg(image)

  # Split each image tensor into two tensors:
  # - one with a real building facade image
  # - one with an architecture label image 
  w = tf.shape(image)[1]
  w = w // 2
  input_image = image[:, w:, :]
  real_image = image[:, :w, :]

  # Convert both images to float32 tensors
  input_image = tf.cast(input_image, tf.float32)
  real_image = tf.cast(real_image, tf.float32)

  return input_image, real_image

Plot a sample of the input (architecture label image) and real (building facade photo) images:

In [None]:
inp, re = load(str(PATH / 'train/100.png'))
# Casting to int for matplotlib to display the images
plt.figure()
plt.imshow(inp / 255.0)
plt.figure()
plt.imshow(re / 255.0)

As described in the [pix2pix paper](https://arxiv.org/abs/1611.07004), you need to apply random jittering and mirroring to preprocess the training set.

Define several functions that:

1. Resize each `256 x 256` image to a larger height and width—`286 x 286`.
2. Randomly crop it back to `256 x 256`.
3. Randomly flip the image horizontally i.e., left to right (random mirroring).
4. Normalize the images to the `[-1, 1]` range.

In [None]:
# The facade training set consist of 400 images
BUFFER_SIZE = 400
# The batch size of 1 produced better results for the U-NSet in the original pix2pix experiment
BATCH_SIZE = 1
# Each image is 256x256 in size
IMG_WIDTH = 256
IMG_HEIGHT = 256

In [None]:
def resize(input_image, real_image, height, width):
  input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
  real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

  return input_image, real_image

In [None]:
def random_crop(input_image, real_image):
  stacked_image = tf.stack([input_image, real_image], axis=0)
  cropped_image = tf.image.random_crop(
      stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])

  return cropped_image[0], cropped_image[1]

In [None]:
# Normalizing the images to [-1, 1]
def normalize(input_image, real_image):
  input_image = (input_image / 127.5) - 1
  real_image = (real_image / 127.5) - 1

  return input_image, real_image

In [None]:
@tf.function()
def random_jitter(input_image, real_image):
  # Resizing to 286x286
  input_image, real_image = resize(input_image, real_image, 286, 286)

  # Random cropping back to 256x256
  input_image, real_image = random_crop(input_image, real_image)

  if tf.random.uniform(()) > 0.5:
    # Random mirroring
    input_image = tf.image.flip_left_right(input_image)
    real_image = tf.image.flip_left_right(real_image)

  return input_image, real_image

You can inspect some of the preprocessed output:

In [None]:
plt.figure(figsize=(6, 6))
for i in range(4):
  rj_inp, rj_re = random_jitter(inp, re)
  plt.subplot(2, 2, i + 1)
  plt.imshow(rj_inp / 255.0)
  plt.axis('off')
plt.show()

Having checked that the loading and preprocessing works, let's define a couple of helper functions that load and preprocess the training and test sets:

In [None]:
def load_image_train(image_file):
  input_image, real_image = load(image_file)
  input_image, real_image = random_jitter(input_image, real_image)
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

In [None]:
def load_image_test(image_file):
  input_image, real_image = load(image_file)
  input_image, real_image = resize(input_image, real_image,
                                   IMG_HEIGHT, IMG_WIDTH)
  input_image, real_image = normalize(input_image, real_image)

  return input_image, real_image

## Build an input pipeline with `tf.data`

In [None]:
train_dataset = tf.data.Dataset.list_files(str(PATH / 'train/*.png'))
train_dataset = train_dataset.map(load_image_train,
                                  num_parallel_calls=tf.data.AUTOTUNE)
train_dataset = train_dataset.shuffle(BUFFER_SIZE)
train_dataset = train_dataset.batch(BATCH_SIZE)

In [None]:
try:
  test_dataset = tf.data.Dataset.list_files(str(PATH / 'test/*.png'))
except tf.errors.InvalidArgumentError:
  test_dataset = tf.data.Dataset.list_files(str(PATH / 'val/*.png'))
test_dataset = test_dataset.map(load_image_test)
test_dataset = test_dataset.batch(BATCH_SIZE)

## Build the generator

The generator of your pix2pix cGAN is a _modified_ [U-Net](https://arxiv.org/abs/1505.04597). A U-Net consists of an encoder (downsampler) and decoder (upsampler). (You can find out more about it in the [Image segmentation](../images/segmentation.ipynb) tutorial and on the [U-Net project website](https://lmb.informatik.uni-freiburg.de/people/ronneber/u-net/).)

- Each block in the encoder is: Convolution -> Batch normalization -> Leaky ReLU
- Each block in the decoder is: Transposed convolution -> Batch normalization -> Dropout (applied to the first 3 blocks) -> ReLU
- There are skip connections between the encoder and decoder (as in the U-Net).

Define the downsampler (encoder):

In [None]:
OUTPUT_CHANNELS = 3

In [None]:
def downsample(filters, size, apply_batchnorm=True):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
      tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                             kernel_initializer=initializer, use_bias=False))

  if apply_batchnorm:
    result.add(tf.keras.layers.BatchNormalization())

  result.add(tf.keras.layers.LeakyReLU())

  return result

In [None]:
down_model = downsample(3, 4)
down_result = down_model(tf.expand_dims(inp, 0))
print (down_result.shape)

Define the upsampler (decoder):

In [None]:
def upsample(filters, size, apply_dropout=False):
  initializer = tf.random_normal_initializer(0., 0.02)

  result = tf.keras.Sequential()
  result.add(
    tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                    padding='same',
                                    kernel_initializer=initializer,
                                    use_bias=False))

  result.add(tf.keras.layers.BatchNormalization())

  if apply_dropout:
      result.add(tf.keras.layers.Dropout(0.5))

  result.add(tf.keras.layers.ReLU())

  return result

In [None]:
up_model = upsample(3, 4)
up_result = up_model(down_result)
print (up_result.shape)

Define the generator with the downsampler and the upsampler:

In [None]:
def Generator():
  inputs = tf.keras.layers.Input(shape=[256, 256, 3])

  down_stack = [
    downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
    downsample(128, 4),  # (batch_size, 64, 64, 128)
    downsample(256, 4),  # (batch_size, 32, 32, 256)
    downsample(512, 4),  # (batch_size, 16, 16, 512)
    downsample(512, 4),  # (batch_size, 8, 8, 512)
    downsample(512, 4),  # (batch_size, 4, 4, 512)
    downsample(512, 4),  # (batch_size, 2, 2, 512)
    downsample(512, 4),  # (batch_size, 1, 1, 512)
  ]

  up_stack = [
    upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
    upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
    upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
    upsample(512, 4),  # (batch_size, 16, 16, 1024)
    upsample(256, 4),  # (batch_size, 32, 32, 512)
    upsample(128, 4),  # (batch_size, 64, 64, 256)
    upsample(64, 4),  # (batch_size, 128, 128, 128)
  ]

  initializer = tf.random_normal_initializer(0., 0.02)
  last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # (batch_size, 256, 256, 3)

  x = inputs

  # Downsampling through the model
  skips = []
  for down in down_stack:
    x = down(x)
    skips.append(x)

  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    x = tf.keras.layers.Concatenate()([x, skip])

  x = last(x)

  return tf.keras.Model(inputs=inputs, outputs=x)

Visualize the generator model architecture:

In [None]:
generator = Generator()
tf.keras.utils.plot_model(generator, show_shapes=True, dpi=300)

Test the generator:

In [None]:
#gen_output = generator(inp[tf.newaxis, ...], training=False)
#plt.imshow(gen_output[0, ...])

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt

# Assuming 'inp' is your original input image tensor with shape (2092, 2520, 3)

# Resize the input image to (256, 256)
inp_resized = tf.image.resize(inp, [256, 256])

# Add a batch dimension
inp_resized = inp_resized[tf.newaxis, ...]

# Test the generator with the resized input
gen_output = generator(inp_resized, training=False)

# Visualize the output
plt.imshow(gen_output[0, ...])
plt.axis('off')  # Hide axes
plt.show()

### Define the generator loss

GANs learn a loss that adapts to the data, while cGANs learn a structured loss that penalizes a possible structure that differs from the network output and the target image, as described in the [pix2pix paper](https://arxiv.org/abs/1611.07004).

- The generator loss is a sigmoid cross-entropy loss of the generated images and an **array of ones**.
- The pix2pix paper also mentions the L1 loss, which is a MAE (mean absolute error) between the generated image and the target image.
- This allows the generated image to become structurally similar to the target image.
- The formula to calculate the total generator loss is `gan_loss + LAMBDA * l1_loss`, where `LAMBDA = 100`. This value was decided by the authors of the paper.

In [None]:
LAMBDA = 100

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
def generator_loss(disc_generated_output, gen_output, target):
  gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

  # Mean absolute error
  l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

  total_gen_loss = gan_loss + (LAMBDA * l1_loss)

  return total_gen_loss, gan_loss, l1_loss

The training procedure for the generator is as follows:

![Generator Update Image](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/generative/images/gen.png?raw=1)


## Build the discriminator

The discriminator in the pix2pix cGAN is a convolutional PatchGAN classifier—it tries to classify if each image _patch_ is real or not real, as described in the [pix2pix paper](https://arxiv.org/abs/1611.07004).

- Each block in the discriminator is: Convolution -> Batch normalization -> Leaky ReLU.
- The shape of the output after the last layer is `(batch_size, 30, 30, 1)`.
- Each `30 x 30` image patch of the output classifies a `70 x 70` portion of the input image.
- The discriminator receives 2 inputs: 
    - The input image and the target image, which it should classify as real.
    - The input image and the generated image (the output of the generator), which it should classify as fake.
    - Use `tf.concat([inp, tar], axis=-1)` to concatenate these 2 inputs together.

Let's define the discriminator:

In [None]:
def Discriminator():
  initializer = tf.random_normal_initializer(0., 0.02)

  inp = tf.keras.layers.Input(shape=[256, 256, 3], name='input_image')
  tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')

  x = tf.keras.layers.concatenate([inp, tar])  # (batch_size, 256, 256, channels*2)

  down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)
  down2 = downsample(128, 4)(down1)  # (batch_size, 64, 64, 128)
  down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)

  zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)
  conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 31, 31, 512)

  batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

  leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

  zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)

  last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)

  return tf.keras.Model(inputs=[inp, tar], outputs=last)

Visualize the discriminator model architecture:

In [None]:
discriminator = Discriminator()
tf.keras.utils.plot_model(discriminator, show_shapes=True, dpi=300)

Test the discriminator:

In [None]:
# Ensure inp and gen_output are of shape (256, 256, 3) before adding a batch dimension
inp_resized = tf.image.resize(inp, [256, 256])
gen_output_resized = tf.image.resize(gen_output, [256, 256])

# If necessary, add a single batch dimension if the tensors are not already batched
if len(inp_resized.shape) == 3:
    inp_resized = inp_resized[tf.newaxis, ...]
if len(gen_output_resized.shape) == 3:
    gen_output_resized = gen_output_resized[tf.newaxis, ...]

# Confirm shapes to avoid further issues
print("Input shape:", inp_resized.shape)  # Should be (1, 256, 256, 3)
print("Generated output shape:", gen_output_resized.shape)  # Should be (1, 256, 256, 3)

# Test the discriminator with the corrected inputs
disc_out = discriminator([inp_resized, gen_output_resized], training=False)

# Visualize the discriminator output
plt.imshow(disc_out[0, ..., -1], vmin=-20, vmax=20, cmap='RdBu_r')
plt.colorbar()
plt.title("Discriminator Output Heatmap")
plt.show()


### Define the discriminator loss

- The `discriminator_loss` function takes 2 inputs: **real images** and **generated images**.
- `real_loss` is a sigmoid cross-entropy loss of the **real images** and an **array of ones(since these are the real images)**.
- `generated_loss` is a sigmoid cross-entropy loss of the **generated images** and an **array of zeros (since these are the fake images)**.
- The `total_loss` is the sum of `real_loss` and `generated_loss`.

In [None]:
def discriminator_loss(disc_real_output, disc_generated_output):
  real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

  generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

  total_disc_loss = real_loss + generated_loss

  return total_disc_loss

The training procedure for the discriminator is shown below.

To learn more about the architecture and the hyperparameters you can refer to the [pix2pix paper](https://arxiv.org/abs/1611.07004).

![Discriminator Update Image](https://github.com/tensorflow/docs/blob/master/site/en/tutorials/generative/images/dis.png?raw=1)


## Define the optimizers and a checkpoint-saver


In [None]:
generator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

## Generate images

Write a function to plot some images during training.

- Pass images from the test set to the generator.
- The generator will then translate the input image into the output.
- The last step is to plot the predictions and _voila_!

Note: The `training=True` is intentional here since you want the batch statistics, while running the model on the test dataset. If you use `training=False`, you get the accumulated statistics learned from the training dataset (which you don't want).

In [None]:
def generate_images(model, test_input, tar):
  prediction = model(test_input, training=True)
  plt.figure(figsize=(15, 15))

  display_list = [test_input[0], tar[0], prediction[0]]
  title = ['Input Image (Facies)', 'Ground Truth (Sw)', 'Predicted Image(Sw)']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # Getting the pixel values in the [0, 1] range to plot.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()

Test the function:

In [None]:
for example_input, example_target in test_dataset.take(1):
  generate_images(generator, example_input, example_target)

## Training

- For each example input generates an output.
- The discriminator receives the `input_image` and the generated image as the first input. The second input is the `input_image` and the `target_image`.
- Next, calculate the generator and the discriminator loss.
- Then, calculate the gradients of loss with respect to both the generator and the discriminator variables(inputs) and apply those to the optimizer.
- Finally, log the losses to TensorBoard.

In [None]:
log_dir="logs/"

summary_writer = tf.summary.create_file_writer(
  log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))

In [None]:
@tf.function
def train_step(input_image, target, step):
  with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
    gen_output = generator(input_image, training=True)

    disc_real_output = discriminator([input_image, target], training=True)
    disc_generated_output = discriminator([input_image, gen_output], training=True)

    gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
    disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

  generator_gradients = gen_tape.gradient(gen_total_loss,
                                          generator.trainable_variables)
  discriminator_gradients = disc_tape.gradient(disc_loss,
                                               discriminator.trainable_variables)

  generator_optimizer.apply_gradients(zip(generator_gradients,
                                          generator.trainable_variables))
  discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
                                              discriminator.trainable_variables))

  with summary_writer.as_default():
    tf.summary.scalar('gen_total_loss', gen_total_loss, step=step//1000)
    tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step//1000)
    tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step//1000)
    tf.summary.scalar('disc_loss', disc_loss, step=step//1000)

The actual training loop. Since this tutorial can run of more than one dataset, and the datasets vary greatly in size the training loop is setup to work in steps instead of epochs.

- Iterates over the number of steps.
- Every 10 steps print a dot (`.`).
- Every 1k steps: clear the display and run `generate_images` to show the progress.
- Every 5k steps: save a checkpoint.

In [None]:
def fit(train_ds, test_ds, steps):
  example_input, example_target = next(iter(test_ds.take(1)))
  start = time.time()

  for step, (input_image, target) in train_ds.repeat().take(steps).enumerate():
    if (step) % 1000 == 0:
      display.clear_output(wait=True)

      if step != 0:
        print(f'Time taken for 1000 steps: {time.time()-start:.2f} sec\n')

      start = time.time()

      generate_images(generator, example_input, example_target)
      print(f"Step: {step//1000}k")

    train_step(input_image, target, step)

    # Training step
    if (step+1) % 10 == 0:
      print('.', end='', flush=True)


    # Save (checkpoint) the model every 5k steps
    if (step + 1) % 5000 == 0:
      checkpoint.save(file_prefix=checkpoint_prefix)

This training loop saves logs that you can view in TensorBoard to monitor the training progress.

If you work on a local machine, you would launch a separate TensorBoard process. When working in a notebook, launch the viewer before starting the training to monitor with TensorBoard.

Launch the TensorBoard viewer (Sorry, this doesn't
display on tensorflow.org):

In [None]:
%load_ext tensorboard
%tensorboard --logdir {log_dir}

You can view the [results of a previous run](https://tensorboard.dev/experiment/lZ0C6FONROaUMfjYkVyJqw) of this notebook on [TensorBoard.dev](https://tensorboard.dev/).

Finally, run the training loop:

In [None]:
fit(train_dataset, test_dataset, steps=100000)

Interpreting the logs is more subtle when training a GAN (or a cGAN like pix2pix) compared to a simple classification or regression model. Things to look for:

- Check that neither the generator nor the discriminator model has "won". If either the `gen_gan_loss` or the `disc_loss` gets very low, it's an indicator that this model is dominating the other, and you are not successfully training the combined model.
- The value `log(2) = 0.69` is a good reference point for these losses, as it indicates a perplexity of 2 - the discriminator is, on average, equally uncertain about the two options.
- For the `disc_loss`, a value below `0.69` means the discriminator is doing better than random on the combined set of real and generated images.
- For the `gen_gan_loss`, a value below `0.69` means the generator is doing better than random at fooling the discriminator.
- As training progresses, the `gen_l1_loss` should go down.

## Restore the latest checkpoint and test the network

In [None]:
!ls {checkpoint_dir}

In [None]:
# Restoring the latest checkpoint in checkpoint_dir
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

## Generate some images using the test set

In [None]:


# Run the trained model on a few examples from the test set
for inp, tar in test_dataset.take(363):
  generate_images(generator, inp, tar)
     


In [None]:
import os
import tensorflow as tf
from PIL import Image
import matplotlib.pyplot as plt

# Define the save directory path
save_path = '/home/g202103050/Documents/A.Rahman/2-3D GAN part/2D/2D_New Eara/Facies to por or perm or Sw/results/100kSw_images'
os.makedirs(save_path, exist_ok=True)  # Create the directory if it doesn't exist

# Modified generate_images function
def generate_and_save_images(generator, inp, tar, idx):
    prediction = generator(inp, training=True)
    
    # Convert TensorFlow tensors to NumPy arrays
    inp_img = inp[0].numpy() * 0.5 + 0.5  # Denormalize to [0,1]
    tar_img = tar[0].numpy() * 0.5 + 0.5  # Denormalize to [0,1]
    pred_img = prediction[0].numpy() * 0.5 + 0.5  # Denormalize to [0,1]

    # Set up matplotlib figure with larger dimensions for higher resolution
    plt.figure(figsize=(20, 8))  # Adjust size for better resolution
    
    display_list = [inp_img, tar_img, pred_img]
    title = ['Input Image (Facies)', 'Ground Truth (Sw)', 'Predicted Image(Sw)']
    
    for i in range(3):
        plt.subplot(1, 3, i+1)
        plt.title(title[i])
        plt.imshow(display_list[i])
        plt.axis('off')
    
    # Save the figure with high DPI and tight bounding box
    plt.savefig(os.path.join(save_path, f"comparison_image_{idx+1}.png"), dpi=900, bbox_inches='tight', format='png')
    plt.close()

# Run the trained model on a few examples from the test set
for idx, (inp, tar) in enumerate(test_dataset.take(363)):
    generate_and_save_images(generator, inp, tar, idx)
    print(f"Saved high-resolution comparison image {idx+1} at {os.path.join(save_path, f'comparison_image_{idx+1}.png')}")


In [None]:
def calculate_fid(real_images, generated_images):
    # Your FID calculation logic here
    fid_score = 0  # Replace this with the actual FID computation
    return fid_score


In [None]:
def calculate_amt(true_images, generated_images):
    # Calculate the absolute difference
    diff = tf.abs(generated_images - true_images)
    
    # Replace any non-finite values with zero to avoid NaNs
    diff = tf.where(tf.math.is_finite(diff), diff, tf.zeros_like(diff))
    
    # Calculate mean absolute difference
    amt_score = tf.reduce_mean(diff)
    
    # Optional: print intermediate values for debugging
    print("Mean Absolute Difference:", amt_score)
    
    return amt_score

In [None]:
import torch
import torch.nn.functional as F
from torchvision.models.segmentation import fcn_resnet50

def calculate_fcn_score(real_labels, generated_images):
    model = fcn_resnet50(pretrained=True).eval()
    
    # Convert TensorFlow tensors to NumPy, then to PyTorch tensors
    generated_images = torch.from_numpy(generated_images.numpy()).permute(0, 3, 1, 2).float()
    
    # Ensure the images are the correct size
    generated_images = F.interpolate(generated_images, size=(224, 224), mode='bilinear', align_corners=False)
    
    with torch.no_grad():
        output = model(generated_images)['out']
        # Further processing of the output as needed, e.g., calculating an FCN score
        fcn_score = output.mean().item()  # Example calculation
    
    return fcn_score

In [None]:

def train_step(input_image, target, step):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        gen_output = generator(input_image, training=True)

        disc_real_output = discriminator([input_image, target], training=True)
        disc_generated_output = discriminator([input_image, gen_output], training=True)

        gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target)
        disc_loss = discriminator_loss(disc_real_output, disc_generated_output)

    generator_gradients = gen_tape.gradient(gen_total_loss, generator.trainable_variables)
    discriminator_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(generator_gradients, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(discriminator_gradients, discriminator.trainable_variables))

    # Calculate metrics
    fid_score = calculate_fid(target, gen_output)
    amt_score = calculate_amt(target, gen_output)  # Only two arguments
    fcn_score = calculate_fcn_score(target, gen_output)

    # Log the losses and metrics
    with summary_writer.as_default():
        tf.summary.scalar('gen_total_loss', gen_total_loss, step=step // 1000)
        tf.summary.scalar('gen_gan_loss', gen_gan_loss, step=step // 1000)
        tf.summary.scalar('gen_l1_loss', gen_l1_loss, step=step // 1000)
        tf.summary.scalar('disc_loss', disc_loss, step=step // 1000)
        tf.summary.scalar('FID', fid_score, step=step // 1000)
        tf.summary.scalar('AMT', amt_score, step=step // 1000)
        tf.summary.scalar('FCN Score', fcn_score, step=step // 1000)


In [None]:
%tensorboard --logdir logs/


In [None]:
import torch
import torch.nn.functional as F
from torchvision.models.segmentation import fcn_resnet50

def calculate_fcn_score(real_labels, generated_images):
    model = fcn_resnet50(pretrained=True).eval()
    
    # Convert TensorFlow tensors to NumPy, then to PyTorch tensors
    generated_images = torch.from_numpy(generated_images.numpy()).permute(0, 3, 1, 2).float()
    
    # Ensure the images are the correct size
    generated_images = F.interpolate(generated_images, size=(224, 224), mode='bilinear', align_corners=False)
    
    with torch.no_grad():
        output = model(generated_images)['out']
        # Further processing of the output as needed, e.g., calculating an FCN score
        fcn_score = output.mean().item()  # Example calculation
    
    return fcn_score


In [None]:
def calculate_fid(real_images, generated_images):
    # Your FID calculation logic here
    fid_score = 0  # Replace this with the actual FID computation
    return fid_score


In [None]:
import torch
import torch.nn.functional as F
from torchvision.models.segmentation import fcn_resnet50

def calculate_fcn_score(real_labels, generated_images):
    model = fcn_resnet50(pretrained=True).eval()

    # Convert TensorFlow tensors to NumPy, then to PyTorch tensors
    generated_images = torch.from_numpy(generated_images.numpy()).permute(0, 3, 1, 2).float()

    # Replace any nan values in generated_images with zero
    generated_images = torch.nan_to_num(generated_images, nan=0.0, posinf=0.0, neginf=0.0)

    # Ensure the images are the correct size
    generated_images = F.interpolate(generated_images, size=(224, 224), mode='bilinear', align_corners=False)

    with torch.no_grad():
        output = model(generated_images)['out']
        fcn_score = output.mean().item()

    return fcn_score


In [None]:
model = fcn_resnet50(weights='FCN_ResNet50_Weights.DEFAULT').eval()


In [None]:
def calculate_amt(true_images, generated_images):
    # Calculate the absolute difference
    diff = tf.abs(generated_images - true_images)
    
    # Replace any non-finite values with zero to avoid NaNs
    diff = tf.where(tf.math.is_finite(diff), diff, tf.zeros_like(diff))
    
    # Calculate mean absolute difference
    amt_score = tf.reduce_mean(diff)
    
    # Optional: print intermediate values for debugging
    print("Mean Absolute Difference:", amt_score)
    
    return amt_score


In [None]:
test_input_image, test_target_image = next(iter(train_dataset))
test_gen_output = generator(test_input_image, training=False)

fid_test = calculate_fid(test_target_image, test_gen_output)
amt_test = calculate_amt(test_target_image, test_gen_output)  # Only two arguments now
fcn_test = calculate_fcn_score(test_target_image, test_gen_output)

print("FID Test:", fid_test)
print("AMT Test:", amt_test)
print("FCN Score Test:", fcn_test)


In [None]:
print("Test Input Image:", test_input_image)
print("Test Target Image:", test_target_image)
print("Generated Output Image:", test_gen_output)


In [None]:
for step, (input_image, target) in train_dataset.take(1).enumerate():
    print(f'Running Step: {step}')
    train_step(input_image, target, step)
    break  # Only one iteration to test


In [None]:
with summary_writer.as_default():
    tf.summary.scalar('test_metric', 0.5, step=1)


In [None]:
for example_input, example_target in test_dataset.take(1):
    generate_images(generator, example_input, example_target)


In [None]:
print("Devices:", tf.config.list_physical_devices('GPU'))


In [None]:
print(f"Generated Image Range: {tf.reduce_min(generated_scaled).numpy()}, {tf.reduce_max(generated_scaled).numpy()}")
print(f"Target Image Range: {tf.reduce_min(target_scaled).numpy()}, {tf.reduce_max(target_scaled).numpy()}")


In [None]:
if tf.reduce_any(tf.math.is_nan(generated_image)):
    print("NaN values detected in generated image output!")
    generated_image = tf.where(tf.math.is_nan(generated_image), tf.zeros_like(generated_image), generated_image)


In [None]:
# Check if NaNs are in the input and target tensors
for inp, tar in test_dataset.take(1):
    if tf.reduce_any(tf.math.is_nan(inp)) or tf.reduce_any(tf.math.is_nan(tar)):
        print("NaN values detected in input or target data!")


In [None]:
# Forward pass through each layer to detect where NaNs appear
for layer in generator.layers:
    inp = layer(inp)
    if tf.reduce_any(tf.math.is_nan(inp)):
        print(f"NaN detected after layer: {layer.name}")
        break


In [None]:
for inp, tar in test_dataset.take(1):
    generated_image = generator(inp, training=False)

    # Replace NaNs with zeros in generated output
    if tf.reduce_any(tf.math.is_nan(generated_image)):
        print("NaN values in generated_image")
        generated_image = tf.where(tf.math.is_nan(generated_image), tf.zeros_like(generated_image), generated_image)

    # Scale generated and target images
    target_scaled = (tar + 1) / 2
    generated_scaled = (generated_image + 1) / 2

    print(f"Generated Image Range: {tf.reduce_min(generated_scaled).numpy()}, {tf.reduce_max(generated_scaled).numpy()}")
    print(f"Target Image Range: {tf.reduce_min(target_scaled).numpy()}, {tf.reduce_max(target_scaled).numpy()}")

    # Calculate IoU
    iou = calculate_iou(generated_scaled.numpy(), target_scaled.numpy())
    print(f"IoU: {iou}")

    # Calculate PSNR
    psnr = calculate_psnr(target_scaled, generated_scaled)
    print(f"PSNR: {psnr} dB")

    # Calculate FID
    try:
        fid_score = calculate_fid_torch(target_scaled.numpy(), generated_scaled.numpy())
        print(f"FID: {fid_score}")
    except Exception as e:
        print(f"Error calculating FID: {e}")
