## Imports

In [1]:
import numpy as np 
import pandas as pd 
import os
import re
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, Dropout, Conv2DTranspose, UpSampling2D, add, Flatten, Lambda, Reshape, BatchNormalization, concatenate, AveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.python.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard
import matplotlib.pyplot as plt
import scipy
from PIL import Image
import random
import cv2
from tensorflow.keras.metrics import MeanAbsoluteError, MeanSquaredError, RootMeanSquaredError, Mean, binary_crossentropy, binary_accuracy
from sklearn.model_selection import train_test_split

## GPU Mounting

In [2]:
# try:
#     import google.colab
#     in_colab = True
# except ImportError:
#     in_colab = False

# if in_colab:
#     try:
#         tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
#         print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
#         tf.config.experimental_connect_to_cluster(tpu)
#         tf.tpu.experimental.initialize_tpu_system(tpu)
#         tpu_strategy = tf.distribute.TPUStrategy(tpu)
#     except ValueError:
#         if 'GPU' in [gpu[-1] for gpu in tf.config.list_physical_devices('GPU')]:
#             print('Running on Google Colab GPU')
#         else:
#             print('Not connected to a TPU or GPU runtime')

# else:
#     physical_devices = tf.config.list_physical_devices('GPU')
#     print(physical_devices)
#     try:
#         if len(physical_devices):
#             for gpu in physical_devices:
#                 tf.config.experimental.set_memory_growth(gpu, enable = True)
#                 print(f"Using local GPU: {gpu}")
#             sess = tf.compat.v1.Session(config = tf.compat.v1.ConfigProto(log_device_placement = True))
#         else:
#             print("No local GPU found")
#     except:
#         print("Error Mounting a GPU")
tf.config.set_visible_devices([], 'GPU')
# tf.debugging.set_log_device_placement(True)

# Create some tensors and perform an operation
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
c = tf.matmul(a, b)

print(c)

tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)


## Test GAN Model

In [3]:
def make_generator_model(input_shape=(800, 1200, 3)):
    # Input layer
    input_img = Input(shape=input_shape)

    # Convolutional layers
    conv1 = Conv2D(8, (3, 3), padding='same', activation='elu')(input_img)
    conv2 = Conv2D(16, (3, 3), padding='same', activation='elu')(conv1)
    conv3 = Conv2D(32, (3, 3), padding='same', activation='elu')(conv2)
    conv4 = Conv2D(64, (3, 3), padding='same', activation='elu')(conv3)
    concat1 = concatenate([conv2, conv4], axis=-1)
    conv5 = Conv2D(64, (3, 3), padding='same', activation='elu')(concat1)
    output_img = Conv2D(3, (3, 3), padding='same', activation='sigmoid')(conv5)

    # Create the model
    generator_model = Model(inputs=input_img, outputs=output_img)
    return generator_model

In [4]:
generator = make_generator_model()
generator.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 800, 1200,   0           []                               
                                3)]                                                               
                                                                                                  
 conv2d (Conv2D)                (None, 800, 1200, 8  224         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_1 (Conv2D)              (None, 800, 1200, 1  1168        ['conv2d[0][0]']                 
                                6)                                                            

In [5]:
def make_discriminator_model(input_shape=(800, 1200, 3)):
    # Input layer
    input_img = Input(shape=input_shape)

    # Convolutional layers
    conv1 = Conv2D(8, (7, 7), activation='relu')(input_img)
    conv2 = Conv2D(16, (7, 7), activation='relu')(conv1)
    conv3 = Conv2D(32, (7, 7), activation='relu')(conv2)
    conv4 = Conv2D(4, (7, 7), activation='relu')(conv3)
    maxpool1 = MaxPooling2D((7, 7))(conv4)
    flatten = Flatten()(maxpool1)
    output_img = Dense(1, activation='sigmoid')(flatten)

    # Create the model
    discriminator_model = Model(inputs=input_img, outputs=output_img)
    return discriminator_model

In [6]:
discriminator = make_discriminator_model()
discriminator.summary()

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_2 (InputLayer)        [(None, 800, 1200, 3)]    0         
                                                                 
 conv2d_6 (Conv2D)           (None, 794, 1194, 8)      1184      
                                                                 
 conv2d_7 (Conv2D)           (None, 788, 1188, 16)     6288      
                                                                 
 conv2d_8 (Conv2D)           (None, 782, 1182, 32)     25120     
                                                                 
 conv2d_9 (Conv2D)           (None, 776, 1176, 4)      6276      
                                                                 
 max_pooling2d (MaxPooling2D  (None, 110, 168, 4)      0         
 )                                                               
                                                           

In [7]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [8]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

In [9]:
def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [10]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

## Functions and callbacks

In [11]:
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
reduce_lr_plateau = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
tensorboard = TensorBoard(log_dir='logs')

In [12]:
def plot_history(history):
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [13]:
def plot_predictions(model, X, y, num_examples=5, seed = 42):
    np.random.seed(seed)
    indices = np.random.randint(0, len(X), num_examples)
    for i in indices:
        x_sample = y[i:i+1]
        y_sample = y[i:i+1]
        
        # Predict
        y_pred = model.predict(x_sample)

        # Plot
        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.imshow(x_sample[0], cmap='gray')
        plt.title('Original')
        
        plt.subplot(1, 3, 2)
        plt.imshow(y_sample[0], cmap='gray')
        plt.title('Expected')
        
        plt.subplot(1, 3, 3)
        plt.imshow(y_pred[0], cmap='gray')
        plt.title('Reconstructed by Model')
        
        plt.show()

In [14]:
BATCH_SIZE = 4
EPOCHS = 100
seed = 42

## Data preparation

In [15]:
csv_file = 'Data/image_data.csv'
df = pd.read_csv(csv_file)

low_res_folder = 'Data/low res/'
high_res_folder = 'Data/high res/'

df = df[df['low_res'].str.contains('_6')]

df = df.iloc[:100]

datagen = ImageDataGenerator(
    rescale=1./255,
    validation_split=0.2,
)

train_generator = datagen.flow_from_dataframe(
    dataframe=df,
    directory=low_res_folder,
    x_col='low_res',
    y_col='high_res',
    target_size=(800, 1200),
    seed=42,
    batch_size = BATCH_SIZE,
    class_mode='input',
    subset='training'
)

validation_generator = datagen.flow_from_dataframe(
    dataframe=df,
    directory=low_res_folder,
    x_col='low_res',
    y_col='high_res',
    batch_size = BATCH_SIZE,
    target_size=(800, 1200),
    seed=42,
    class_mode='input',
    subset='validation'
)

Found 80 validated image filenames.
Found 20 validated image filenames.


## GAN training

In [16]:
tf.config.run_functions_eagerly(True)

In [17]:
# X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)
# train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(len(X_train)).batch(BATCH_SIZE)
# val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val)).batch(BATCH_SIZE)

best_val_loss = float('inf')
early_stopping_patience = 10
early_stopping_counter = 0

train_gen_losses = []
train_disc_losses = []
avr_train_gen_losses = []
avr_train_disc_losses = []
val_gen_losses = []
val_disc_losses = []

mae_list = []
mse_list = []
accuracy_list = []

@tf.function
def train_step(X, y):
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        real_output = discriminator(y, training=True)
        generated_images = generator(X, training=True)
        fake_output = discriminator(generated_images, training=True)
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))
    train_gen_losses.append(gen_loss)
    train_disc_losses.append(disc_loss)

    try:
        mae_list.append(tf.abs(generated_images - y))
        mse_list.append(tf.square(generated_images - y))
        accuracy_list.append(tf.keras.metrics.binary_accuracy(generated_images, y))
    except:
        pass

for epoch in tqdm(range(EPOCHS), desc="Training Epochs", unit="epoch"):
    for batch_X, batch_y in train_generator:
        train_step(batch_X, batch_y)

    val_losses = []
    val_mae = []
    val_mse = []
    val_accuracy = []

    for val_batch_X, val_batch_y in validation_generator:
        val_real_output = discriminator(val_batch_y, training=False)
        val_generated_images = generator(val_batch_X, training=False)
        val_fake_output = discriminator(val_generated_images, training=False)
        val_gen_loss = generator_loss(val_fake_output)
        val_disc_loss = discriminator_loss(val_real_output, val_fake_output)
        val_losses.append((val_gen_loss, val_disc_loss))

    val_mae.append(np.abs(val_real_output.numpy() - val_fake_output.numpy()))
    val_mse.append(np.square(val_real_output.numpy() - val_fake_output.numpy()))
    val_accuracy.append(tf.keras.metrics.binary_accuracy(val_real_output.numpy(), val_fake_output.numpy()))

    avg_val_gen_loss = tf.reduce_mean([loss[0] for loss in val_losses])
    avg_val_disc_loss = tf.reduce_mean([loss[1] for loss in val_losses])
    avr_train_gen_losses.append(tf.reduce_mean(train_gen_losses))
    avr_train_disc_losses.append(tf.reduce_mean(train_disc_losses))

    mae = tf.reduce_mean(mae_list)
    mse = tf.reduce_mean(mse_list)
    accuracy = tf.reduce_mean(accuracy_list)
    val_gen_losses.append(avg_val_gen_loss)
    val_disc_losses.append(avg_val_disc_loss)

    val_mae = np.array(val_mae)

    print(f"Epoch {epoch + 1}, "
          f"Average Training Generator Loss: {avr_train_gen_losses[-1]}, "
          f"Average Training Discriminator Loss: {avr_train_disc_losses[-1]}, "
          f"Average Validation Generator Loss: {avg_val_gen_loss}, "
          f"Average Validation Discriminator Loss: {avg_val_disc_loss}, "
          f"Average Training MAE: {mae}, "
          f"Average Training MSE: {mse}, "
          f"Average Training Accuracy: {accuracy}"
          f"Average Validation MAE: {tf.reduce_mean(val_mae)}, "
          f"Average Validation MSE: {tf.reduce_mean(val_mse)}, "
          f"Average Validation Accuracy: {tf.reduce_mean(val_accuracy)}",
          )
    
    mae_list = []
    mse_list = []
    accuracy_list = []

    if avg_val_gen_loss < best_val_loss:
        best_val_loss = avg_val_gen_loss
        early_stopping_counter = 0
    else:
        early_stopping_counter += 1

    if early_stopping_counter == early_stopping_patience:
        print(f"Early stopping at epoch {epoch + 1} due to no improvement in validation loss.")
        generator.load_weights('best_generator_weights.h5')
        discriminator.load_weights('best_discriminator_weights.h5')
        break

    train_generator.reset()
    validation_generator.reset()

print("Finished!")

  output, from_logits = _get_logits(
Training Epochs:   0%|          | 0/100 [24:01<?, ?epoch/s]


KeyboardInterrupt: 

In [None]:
plt.figure(figsize=(12, 6))

plt.subplot(2, 1, 1)
plt.plot(avr_train_gen_losses, label='Training Generator Loss')
plt.plot(avr_train_disc_losses, label='Training Discriminator Loss')
plt.title('Training Losses')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(2, 1, 2)
plt.plot(val_gen_losses, label='Validation Generator Loss')
plt.plot(val_disc_losses, label='Validation Discriminator Loss')
plt.title('Validation Losses')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
plot_predictions(generator, X_train, y_train)

In [None]:
plot_predictions(generator, X_test, y_test)