In [None]:
"""
@authors: faurand, chardes, ehagensieker
"""
import tensorflow as tf
import tensorflow_datasets as tfds
import datetime

%load_ext tensorboard

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [None]:
#load the data set
mnist = tfds.load('mnist', split = ['train', 'test'], as_supervised=True)

#define train and validation set 
train_ds = mnist[0]
val_ds = mnist[1]

In [None]:
def prepare_mnist_data_problem1(mnist, batch_size):
  """
  Preparation of the data set
  Args: 
    batch_size(int): size of the batches used for the training
  Returns: 
    preprocessed data set
  """
  #convert data from uint8 to float32
  mnist = mnist.map(lambda img, target: (tf.cast(img, tf.float32), target))
  #flatten the images into vectors
  mnist = mnist.map(lambda img, target: (tf.reshape(img, (-1,)), target))
  #sloppy input normalization, just bringing image values from range [0, 255] to [-1, 1]
  mnist = mnist.map(lambda img, target: ((img/128.)-1., target))

  #create tuple with two data points and one target
 
  zipped = tf.data.Dataset.zip((mnist.shuffle(1000), 
                                mnist.shuffle(1000)))
  zipped = zipped.map(lambda d1, d2: (d1[0], d2[0], d1[1] + d2[1] >= 5))
  zipped = zipped.map(lambda d1,d2,t: (d1,d2, tf.cast(t, tf.int32)))

  data = zipped.batch(batch_size)
  # prefetch
  data = data.prefetch(tf.data.AUTOTUNE)
  return data

#apply the preparation function to the datasets respectively 
train_ds_problem1 = prepare_mnist_data_problem1(train_ds, 32)
val_ds_problem1 = prepare_mnist_data_problem1(val_ds, 32)

In [None]:
def prepare_mnist_data_problem2(mnist, batch_size, y_value):
    """
  Preparation of the data set
  Args: 
    batch_size(int): size of the batches used for the training
    y_value(int):  Difference between the two images
  Returns: 
    preprocessed data set
  """
  #convert data from uint8 to float32
  mnist = mnist.map(lambda img, target: (tf.cast(img, tf.float32), target))
  #flatten the images into vectors
  mnist = mnist.map(lambda img, target: (tf.reshape(img, (-1,)), target))
  #sloppy input normalization, just bringing image values from range [0, 255] to [-1, 1]
  mnist = mnist.map(lambda img, target: ((img/128.)-1., target))

  #create tuple with two data points and one target
 
  zipped = tf.data.Dataset.zip((mnist.shuffle(1000), 
                                mnist.shuffle(1000)))
  zipped = zipped.map(lambda d1, d2: (d1[0], d2[0], d1[1] - d2[1] == y_value))
  zipped = zipped.map(lambda d1,d2,t: (d1,d2, tf.cast(t, tf.int32)))

  data = zipped.batch(batch_size)
  # prefetch
  data = data.prefetch(tf.data.AUTOTUNE)
  return data


train_ds_problem2 = prepare_mnist_data_problem2(train_ds, 32, 5)
val_ds_problem2 = prepare_mnist_data_problem2(val_ds, 32, 5)

In [None]:
"""
@authors: faurand, chardes, ehagensieker
"""
class MyModel(tf.keras.Model):
  """
  The model we are using for predicting the outcome 
  """ 
  def __init__(self):
    """
    Initializing the model 
    """
    #inherit from parent class
    super().__init__()

    #metrics we want to display
    self.metrics_list = [tf.keras.metrics.BinaryAccuracy(), 
                         tf.keras.metrics.Mean(name = 'loss')]
                
    self.optimizer = tf.keras.optimizers.Adam()
    self.loss_function = tf.keras.losses.BinaryCrossentropy()

    self.dense1 = tf.keras.layers.Dense(32, activation=tf.nn.relu)
    self.dense2 = tf.keras.layers.Dense(32, activation=tf.nn.relu)
        
    self.out_layer = tf.keras.layer.Dense(1,activation=tf.nn.sigmoid)

  @tf.function 
  def call(self, images):
    """
    how to forward the images through the layer
    """
    img1, img2 = images 

    img1_x = self.dense1(img1)
    img1_x = self.dense2(img1_x)

    img2_x = self.dense1(img2)
    img2_x = self.dense2(img2_x)

    combined = tf.concat([img1_x, img2_x], axis = 1)
    out = self.out_layer(combined)

    return out

  @property
  def metrics(self):
    return self.metrics_list


  def reset_metric(self):
    for metric in self.metrics: 
      metric.reset_states()

  @tf.function 
  def train_step(self, data):
    img1, img2, t = data

    with tf.GradientTape() as tape:
      output = self((img1, img2), training = True)
      loss = self.loss_function(t, output)
    
    gradients = tape.gradient(loss, self.trainable_variables)
    self.optimizer.apply(zip(gradients, self.trainable_variables))

    self.metrics[0].update_state(loss)
    
    return {metric.name: metric.result() for metric in self.metrics}

  @tf.function
  def test_step(self, data):
    x, t = data

    prediction = self(x, training = False)
    loss = self.loss_function(t, prediction)

    self.metrics[0].update_state(loss)
    



    


In [None]:
import tqdm
import pprint 

def training_loop(model, train_ds, val_ds, epochs, train_summary_writer, val_summary_writer): 
  for epoch in range(epoch): 
    print(f"Epoch {epoch}: ")

    for data in tqdm.tqdm(train_ds, position = 0, leave = True):

      metrics = model.train_step(data)

      with train_summary_writer.as_default(): 
        for metric in model.metrics: 
          tf.summary.scalar()

      
      model.reset_metrics()
  
    for data in val_ds:
      metrics = model.test_step(data)

      with test_summary_writer.as_default():
        for metric in model.metrics:
          tf.summary.scalar()


      model.reset_metric()
      

In [None]:
def training_function(problem, optimizer): 
