In [44]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Input, Dense, Lambda
import glob
from tensorflow.image import psnr, ssim
import numpy as np
import matplotlib.pyplot as plt
import os



# Define your MTANN model as a Keras model
# Define your MTANN model as a Keras model

def custom_activation(x):
    return x + custom_constant

def create_mtann_model(patch_size, input_shape):
    # Define the model
    model = keras.Sequential()

    # Input layer (specify the input shape)
    model.add(keras.layers.Input(shape=input_shape))

    # Hidden layers with sigmoid activation
    model.add(keras.layers.Dense(64, activation='sigmoid'))
    model.add(keras.layers.Dense(32, activation='sigmoid'))
    model.add(keras.layers.Dense(16, activation='sigmoid'))

    # Output layer with custom activation function
    model.add(keras.layers.Dense(1, activation=custom_activation))

    return model






    
##################################################


def reconstruct_image(input_image, patch_size, mtann_model):
    output_image = np.zeros(input_image)
    
    for y in range(0, input_image.shape[0] - patch_size[0] + 1, 1):
        for x in range(0, input_image.shape[1] - patch_size[1] + 1, 1):
            input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            
            # Pass the input patch through your model to get the single-pixel output
            single_pixel_output = mtann_model(input_patch)
            
            # Place the single-pixel output at the corresponding location in the output image
            output_image[y:y+1, x:x+1] = single_pixel_output

    return output_image




def normalize_images(input_images, target_images):
    normalized_input_images = []
    normalized_target_images = []

    for input_image, target_image in zip(input_images, target_images):
        # Calculate the minimum and maximum values in the input and target images
        min_value_input = tf.reduce_min(input_image)
        max_value_input = tf.reduce_max(input_image)
        min_value_target = tf.reduce_min(target_image)
        max_value_target = tf.reduce_max(target_image)

        # Normalize input image to [0, 1]
        normalized_input_image = (input_image - min_value_input) / (max_value_input - min_value_input)

        # Normalize target image to [0, 1]
        normalized_target_image = (target_image - min_value_target) / (max_value_target - min_value_target)

        normalized_input_images.append(normalized_input_image)
        normalized_target_images.append(normalized_target_image)

    return normalized_input_images, normalized_target_images



BATCH_SIZE = 1
IMG_WIDTH = 512
IMG_HEIGHT = 512



# RESISE FOR DATA AUGMENTATION

def resize(input_image, real_image, height, width):
    
  input_image = tf.image.resize(input_image, [height, width],
                                method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
  real_image = tf.image.resize(real_image, [height, width],
                               method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

  return input_image, real_image


def random_crop(input_image, real_image):
  stacked_image = tf.stack([input_image, real_image], axis=0)
  cropped_image = tf.image.random_crop(stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 1])

  return cropped_image[0], cropped_image[1]




@tf.function
def random_jitter(input_image, real_image):
  # Resizing to 286x286
  input_image, real_image = resize(input_image, real_image, 542, 542)

  # Random cropping back to 256x256
  input_image, real_image = random_crop(input_image, real_image)

  def apply_random_flip():
        return tf.image.flip_left_right(input_image), tf.image.flip_left_right(real_image)

  input_image, real_image = tf.cond(tf.random.uniform(()) > 0.5, apply_random_flip, lambda: (input_image, real_image))

  return input_image, real_image





PATH = "/home/rafael/Área de Trabalho/Dataset-PNG"

####################################################


# Define a custom constant and a custom loss that includes the constant
custom_constant = tf.Variable(initial_value=1.0, trainable=True, name='custom_constant')
custom_constant_initializer = keras.initializers.Constant(value=1.0)


# mtann_model.load_weights('mtann_weights.h5')






##########################################


# Split the input and ground truth images into overlapping patches
patch_size = (16, 16)
stride = (8, 8)
input_patches = []
gt_patches = []

shape=(16, 16, 1)

def reconstruct_image_from_patches(patches, image_shape, patch_size, stride):
    reconstructed_image = np.zeros(image_shape, dtype=np.float32)
    count_map = np.zeros(image_shape, dtype=np.float32)
    
    for y in range(0, image_shape[0] - patch_size[0] + 1, stride[0]):
        for x in range(0, image_shape[1] - patch_size[1] + 1, stride[1]):
            patch = patches.pop(0)  # Take the first patch from the list
            reconstructed_image[y:y+patch_size[0], x:x+patch_size[1]] += patch
            count_map[y:y+patch_size[0], x:x+patch_size[1]] += 1

    # Avoid division by zero and compute the average
    count_map = np.maximum(count_map, 1)
    reconstructed_image /= count_map

    return reconstructed_image

    
def generate_images(input_patches, gt_patches, image_shape, patch_size, stride):
  reconstructed_input = reconstruct_image(input_patches)
  reconstructed_gt = reconstruct_image(gt_patches)


  prediction = np.zeros(reconstructed_input)
    
  for y in range(0, input_image.shape[0] - patch_size[0] + 1, stride_2[0]):
      for x in range(0, input_image.shape[1] - patch_size[1] + 1, stride_2[1]):
          input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            
          # Pass the input patch through your model to get the single-pixel output
          single_pixel_output = mtann_model(input_patch)
            
          # Place the single-pixel output at the corresponding location in the output image
          prediction[y:y+1, x:x+1] = single_pixel_output
    
    
  plt.figure(figsize=(15, 15))

  display_list = [reconstructed_input[0], reconstructed_gt[0], prediction[0]]
  title = ['Input Image', 'Ground Truth', 'Predicted Image']

  for i in range(3):
    plt.subplot(1, 3, i+1)
    plt.title(title[i])
    # Getting the pixel values in the [0, 1] range to plot.
    plt.imshow(display_list[i] * 0.5 + 0.5)
    plt.axis('off')
  plt.show()


def load2(image_file, patch_size, stride):
    image_ts = tf.io.read_file(image_file)
    image_ts = tf.image.decode_png(image_ts)

    # Calculate the center of the image
    w_center = 358

    # Split the image into two parts
    input_image = image_ts[:, w_center:, :]
    real_image = image_ts[:, :w_center, :]

    input_image = tf.cast(input_image, tf.float32)
    real_image = tf.cast(real_image, tf.float32)
    
    input_image, real_image = resize(input_image, real_image, IMG_HEIGHT, IMG_WIDTH)
    
    input_patches = []
    gt_patches = []

    for y in range(0, input_image.shape[0] - patch_size[0] + 1, stride[0]):
        for x in range(0, input_image.shape[1] - patch_size[1] + 1, stride[1]):
            input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            gt_patch = real_image[y:y+patch_size[0], x:x+patch_size[1]]
            input_patches.append(input_patch)
            gt_patches.append(gt_patch)

    return input_patches, gt_patches, image_file
    
########## LOAD DATASETS
 
# LOAD TRAIN

def load_image_train(image_file):
  input_image, real_image, image_file = load2(image_file, patch_size, stride)
# input_image, real_image = random_jitter(input_image, real_image)
  input_image, real_image = normalize_images(input_image, real_image)
  
  return input_image, real_image, image_file

# LOAD VALIDATION

def load_image_val(image_file):
  input_image, real_image, image_file = load2(image_file, patch_size, stride)
# input_image, real_image = random_jitter(input_image, real_image)
  input_image, real_image = normalize_images(input_image, real_image)

  return input_image, real_image, image_file


# LOAD TEST

def load_image_test(image_file):
  input_image, real_image, image_file = load2(image_file, patch_size, stride)
  input_image, real_image = normalize_images(input_image, real_image)

  return input_image, real_image, image_file




############# COMPILE THE MODEL

# Define the custom loss function
def custom_loss(y_true, y_pred):
    # Apply the custom activation within the loss function
    y_pred_with_activation = y_pred + custom_constant
    # Calculate your custom loss here
    # For example, you can use mean squared error (MSE) as a simple loss function
    loss = tf.keras.losses.mean_squared_error(y_true, y_pred_with_activation)
    return loss


# Create your MTANN model
# Define the input shape

input_shape = (16, 16, 1)

# Create your MTANN model
mtann_model = create_mtann_model(patch_size, input_shape)

# Compile the model with your custom loss function
mtann_model.compile(optimizer='adam', loss=custom_loss)


mtann_model.summary()

tf.keras.utils.plot_model(mtann_model, show_shapes=True, dpi=64)



####### CREATE DATASETS

input_directory_train = "/home/rafael/Área de Trabalho/Dataset-PNG"
input_directory_test = "/home/rafael/Área de Trabalho/Dataset-PNG/Test"
input_directory_val = "/home/rafael/Área de Trabalho/Dataset-PNG/Val"


# Use glob to get a list of image file paths in the input directory
image_files      = glob.glob(input_directory_train + "/*.png")  # Change the file extension as needed
image_files_test = glob.glob(input_directory_test + "/*.png")  # Change the file extension as needed
image_files_val  = glob.glob(input_directory_val + "/*.png")  # Change the file extension as needed


# Create a dataset from the list of image files
dataset = tf.data.Dataset.from_tensor_slices(image_files)
dataset = dataset.map(load_image_train)
# Define batch size and other data pipeline operations (e.g., shuffle, repeat, etc.) batch_size = 32
dataset = dataset.shuffle(buffer_size=1000)  # Shuffle the data
dataset = dataset.batch(batch_size)          # Batch the data
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)  # Prefetch for better performanc

# Skip the first N elements and take the next M elements
N = 0
M = 200
dataset = dataset.skip(N).take(M)

##
# Create a dataset from the list of image files
test_dataset = tf.data.Dataset.from_tensor_slices(input_directory_val)
test_dataset = dataset.map(load_image_train)
# Define batch size and other data pipeline operations (e.g., shuffle, repeat, etc.) batch_size = 32
test_dataset = dataset.shuffle(buffer_size=1000)  # Shuffle the data
test_dataset = dataset.batch(batch_size)          # Batch the data
test_dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)  # Prefetch for better performanc

# Skip the first N elements and take the next M elements
N = 0
M = 100
test_dataset = dataset.skip(N).take(M)

#
# Create a dataset from the list of image files
val_dataset = tf.data.Dataset.from_tensor_slices(image_files_test)
val_dataset = dataset.map(load_image_train)
# Define batch size and other data pipeline operations (e.g., shuffle, repeat, etc.) batch_size = 32
val_dataset = dataset.shuffle(buffer_size=1000)  # Shuffle the data
val_dataset = dataset.batch(batch_size)          # Batch the data
val_dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)  # Prefetch for better performanc

# Skip the first N elements and take the next M elements
N = 0
M = 100
test_dataset = dataset.skip(N).take(M)



# Define the checkpoint directory
checkpoint_dir = '/home/rafael/Área de Trabalho/MTANN Checkpoints/1'
os.makedirs(checkpoint_dir, exist_ok=True)

# Create a callback to save model weights
checkpoint_callback = ModelCheckpoint(
    os.path.join(checkpoint_dir, 'weights.{epoch:02d}-{val_loss:.2f}.h5'),
    save_best_only=False,
    save_weights_only=True,
    monitor='val_loss',  # Monitor validation loss
    mode='min',  # Minimize the monitored quantity (in this case, validation loss)
    verbose=1  # Verbosity mode: 1 shows saving messages
)


losses = []  # To store loss values

# Create a callback to save training history (loss) in CSV format
csv_logger = CSVLogger(os.path.join(checkpoint_dir, 'training_history.csv'), append=True)


# Perform the training loop


num_epochs = 10  # Set the number of training epochs

for epoch in range(num_epochs):
    total_loss = 0.0  # Track the total loss for this epoch
    steps_per_epoch = len(input_patches) // batch_size

    # Iterate through the dataset (batches)
    for batch_input, batch_gt, batch_finenames in dataset:
       # Perform the forward pass to get predictions
       single_pixel_output = mtann_model(input_patch)

       # Get the center pixel of batch_gt
       center_x = batch_gt.shape[1] // 2
       center_y = batch_gt.shape[2] // 2
       center_pixel = batch_gt[:, center_x, center_y, :]

        
       # Calculate the mean squared error (MSE) loss
       loss = tf.keras.losses.mean_squared_error(center_pixel, single_pixel_output)

       # Calculate gradients
       grads = mtann_model.optimizer.get_gradients(loss, mtann_model.trainable_variables)

       # Apply gradients to update model weights
       mtann_model.optimizer.apply_gradients(zip(grads, mtann_model.trainable_variables))

       # Track the total loss for this epoch
       total_loss += loss

    input_patch, input_gt, example_filename = next(iter(test_dataset.take(1)))
    image_shape_t=(512,512)
    generate_images(input_patch, input_gt, image_shape_t, patch_size, stride)
    

    # Calculate and print the average loss for this epoch
    average_loss = total_loss / steps_per_epoch
    losses.append(average_loss.numpy())
    print(f"Epoch {epoch + 1}, Loss: {average_loss.numpy()}")

    checkpoint_callback.on_epoch_end(epoch, logs={'val_loss': average_loss})
    csv_logger.on_epoch_end(epoch, logs={'loss': average_loss})



with open(os.path.join(checkpoint_dir, 'loss_history.txt'), 'w') as f:
    for loss in losses:
        f.write(f"{loss}\n")


##################################


#Initialize lists to store PSNR and SSIM values
psnr_train_values = []
ssim_train_values = []
psnr_test_values = []
ssim_test_values = []
psnr_validation_values = []
ssim_validation_values = []




# Calculate PSNR and SSIM for the training dataset
for batch in train_dataset:
    input_image, target_image, filenames_b = batch
    
    # Generate an image using the trained model

    generated_image, target_image = resize(input_image, target_image,IMG_HEIGHT, IMG_WIDTH)
    
    output_image = np.zeros(input_image)
    
    for y in range(0, input_image.shape[0] - patch_size[0] + 1, stride_2[0]):
        for x in range(0, input_image.shape[1] - patch_size[1] + 1, stride_2[1]):
            input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            
            # Pass the input patch through your model to get the single-pixel output
            single_pixel_output = mtann_model(input_patch)
            
            # Place the single-pixel output at the corresponding location in the output image
            output_image[y:y+1, x:x+1] = single_pixel_output


    # Calculate PSNR and SSIM for the generated image compared to the target image
    psnr_train = tf.image.psnr(generated_image, target_image, max_val=1.0)
    psnr_train = tf.where(tf.math.is_finite(psnr_train), psnr_train, 0.0)
    ssim_train = tf.image.ssim(generated_image, target_image, max_val=1.0)
    ssim_train = tf.where(tf.math.is_finite(ssim_train), ssim_train, 0.0)

    psnr_train_values.append(psnr_train)
    ssim_train_values.append(ssim_train)

    # Convert the TensorFlow tensor to a NumPy array for visualization


    generated_image = generated_image[0]  # Extract the image from the batch
    target_image = target_image[0]  # Extract the image from the batch

    plt.figure()
    plt.subplot(121)
    plt.imshow(generated_image, cmap='gray')
    plt.title('Normalized Generated Image')

    plt.figure()
    plt.subplot(122)
    plt.imshow(target_image, cmap='gray')
    plt.title('Target Image')

    plt.show()



for batch in test_dataset:
    input_image, target_image, filenames_b = batch
    
    # Generate an image using the trained model

    generated_image, target_image = resize(input_image, target_image, IMG_HEIGHT, IMG_WIDTH)
    
    output_image = np.zeros(input_image)
    
    for y in range(0, input_image.shape[0] - patch_size[0] + 1, stride_2[0]):
        for x in range(0, input_image.shape[1] - patch_size[1] + 1, stride_2[1]):
            input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            
            # Pass the input patch through your model to get the single-pixel output
            single_pixel_output = mtann_model(input_patch)
            
            # Place the single-pixel output at the corresponding location in the output image
            output_image[y:y+1, x:x+1] = single_pixel_output


    # Calculate PSNR and SSIM for the generated image compared to the target image
    psnr_test = tf.image.psnr(generated_image, target_image, max_val=1.0)
    psnr_test = tf.where(tf.math.is_finite(psnr_test), psnr_test, 0.0)
    ssim_test = tf.image.ssim(generated_image, target_image, max_val=1.0)
    ssim_test = tf.where(tf.math.is_finite(ssim_test), ssim_test, 0.0)

    psnr_test_values.append(psnr_test)
    ssim_test_values.append(ssim_test)

    # Convert the TensorFlow tensor to a NumPy array for visualization

    generated_image = generated_image[0]  # Extract the image from the batch
    target_image = target_image[0]  # Extract the image from the batch


'''
for batch in val_dataset:
    input_image, target_image, filenames_b = batch
    
    # Generate an image using the trained model

    generated_image, target_image = resize(input_image, target_image, IMG_HEIGHT, IMG_WIDTH)
    
    output_image = np.zeros(input_image)
    
    for y in range(0, input_image.shape[0] - patch_size[0] + 1, stride_2[0]):
        for x in range(0, input_image.shape[1] - patch_size[1] + 1, stride_2[1]):
            input_patch = input_image[y:y+patch_size[0], x:x+patch_size[1]]
            
            # Pass the input patch through your model to get the single-pixel output
            single_pixel_output = mtann_model(input_patch)
            
            # Place the single-pixel output at the corresponding location in the output image
            output_image[y:y+1, x:x+1] = single_pixel_output


    # Calculate PSNR and SSIM for the generated image compared to the target image
    psnr_val = tf.image.psnr(generated_image, target_image, max_val=1.0)
    psnr_val = tf.where(tf.math.is_finite(psnr_val), psnr_val, 0.0)
    ssim_val = tf.image.ssim(generated_image, target_image, max_val=1.0)
    ssim_val = tf.where(tf.math.is_finite(ssim_val), ssim_val, 0.0)

    psnr_val_values.append(psnr_val)
    ssim_val_values.append(ssim_val)

    # Convert the TensorFlow tensor to a NumPy array for visualization

    generated_image = generated_image[0]  # Extract the image from the batch
    target_image = target_image[0]  # Extract the image from the batch

'''


average_psnr_val = tf.reduce_mean(psnr_val_values).numpy()
average_ssim_val = tf.reduce_mean(ssim_val_values).numpy()
average_psnr_train = tf.reduce_mean(psnr_train_values).numpy()
average_ssim_train = tf.reduce_mean(ssim_train_values).numpy()
#average_psnr_test = tf.reduce_mean(psnr_test_values).numpy()
#average_ssim_test = tf.reduce_mean(ssim_test_values).numpy()

print(f"Average PSNR for training set: {float(average_psnr_train)}")
print(f"Average SSIM for training set: {float(average_ssim_train)}")
print(f"Average PSNR for test set: {float(average_psnr_test)}")
print(f"Average SSIM for test set: {float(average_ssim_test)}")
#print(f"Average PSNR for validation set: {float(average_psnr_val)}")
#print(f"Average SSIM for validation set: {float(average_ssim_val)}")



Model: "sequential_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_28 (Dense)            (None, 16, 16, 64)        128       
                                                                 
 dense_29 (Dense)            (None, 16, 16, 32)        2080      
                                                                 
 dense_30 (Dense)            (None, 16, 16, 16)        528       
                                                                 
 dense_31 (Dense)            (None, 16, 16, 1)         17        
                                                                 
Total params: 2753 (10.75 KB)
Trainable params: 2753 (10.75 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


NameError: name 'batch_size' is not defined