# Task 02.01
Load the MNIST dataset.

In [1]:
import tensorflow_datasets as tfds
import tensorflow as tf

(train_ds, test_ds ), ds_info = tfds.load('mnist', split=['train', 'test'] , as_supervised = True , with_info = True)

Downloading and preparing dataset Unknown size (download: Unknown size, generated: Unknown size, total: Unknown size) to C:\Users\plo-k\tensorflow_datasets\mnist\3.0.1...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

Generating splits...:   0%|          | 0/2 [00:00<?, ? splits/s]

Generating train examples...: 0 examples [00:00, ? examples/s]

Shuffling C:\Users\plo-k\tensorflow_datasets\mnist\3.0.1.incompleteQASVW2\mnist-train.tfrecord*...:   0%|     …

Generating test examples...: 0 examples [00:00, ? examples/s]

Shuffling C:\Users\plo-k\tensorflow_datasets\mnist\3.0.1.incompleteQASVW2\mnist-test.tfrecord*...:   0%|      …

Dataset mnist downloaded and prepared to C:\Users\plo-k\tensorflow_datasets\mnist\3.0.1. Subsequent calls will reuse this data.


# Task 02.02
Prepare the datasets.

In [3]:
from typing import Callable


def prepare_dataset(data: tf.data.Dataset, target_fn: Callable[[float, float], float]) -> tf.data.Dataset:
    """
    Prepared the MNIST dataset as two images. The label gets determined via the given function.
    :param data: MNIST dataset
    :param target_fn: Function to calculate the label of an image pair
    :return: Prepared dataset
    """
    # Convert image pixels from uint8 to float32
    data = data.map(lambda image, label: (tf.cast(image, tf.float32), label))
    # Flatten the image to 1D
    data = data.map(lambda image, label: (tf.reshape(image, [-1]), label))
    # Map pixel values to number between -1 and 1
    data = data.map(lambda image, label: (image / 128. - 1, label))
    # Create data pairs of two random dataset elements
    zipped_ds = tf.data.Dataset.zip((data.shuffle(2000), data.shuffle(2000)))
    # Generate labels according to the given target function
    zipped_ds = zipped_ds.map(lambda first, second: (first[0], second[0], target_fn(first[1], second[1])))

    zipped_ds = zipped_ds.cache()
    zipped_ds = zipped_ds.shuffle(2000)
    zipped_ds = zipped_ds.batch(32)
    zipped_ds = zipped_ds.prefetch(tf.data.AUTOTUNE)

    return zipped_ds

# Task 3

Define universal calculator model

In [4]:
class MnistCalculatorModel(tf.keras.Model):
    def __init__(self, output_size: int, optimizer: tf.keras.optimizers.Optimizer, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.metrics_list = [tf.keras.metrics.BinaryAccuracy(),
                             tf.keras.metrics.Mean(name="loss")]
        self.optimizer = optimizer
        self.loss_function = tf.keras.losses.MeanSquaredError()

        self.number_detection_layer = tf.keras.layers.Dense(32, activation=tf.nn.relu)
        self.calculation_layer = tf.keras.layers.Dense(10)
        self.out_layer = tf.keras.layers.Dense(output_size, activation=tf.nn.sigmoid)


    def call(self, images, training=False):
        img1, img2 = images

        img1_x = self.number_detection_layer(img1)
        img2_x = self.number_detection_layer(img2)

        combined_x = tf.concat([img1_x, img2_x], axis=1)
        return self.out_layer(combined_x)

    @property
    def metrics(self):
        return self.metrics_list

Create training method

In [None]:
from enum import Enum

class Subtask(Enum):
    SUM_GREATER_5 = 0,
    SUBTRACTION_EQUALS = 1

def is_sum_greater_equals_x(x):
    return lambda a, b: a + b >= x

def subtract(a, b):
    return tf.one_hot(a - b, 10)

def train(subtask: Subtask, optimizer: tf.keras.optimizers.Optimizer):
    # Define variables depending on subtask
    target_fn = subtract if subtask == Subtask.SUBTRACTION_EQUALS else is_sum_greater_equals_x(5)
    output_size = 10 if subtask == Subtask.SUBTRACTION_EQUALS else 1

    train_data = train_ds.apply(lambda data: prepare_dataset(data, target_fn))
    test_data = test_ds.apply(lambda data: prepare_dataset(data, target_fn))

    model = MnistCalculatorModel(output_size, tf.keras.optimizers.Adam())

    epoch_loss_agg = []
    train_losses = []
    test_losses = []
    for epoch in range(1000):
        for (img1, img2, target) in train_data:
            with tf.GradientTape() as tape:
                prediction = model((img1, img2), training=True)
                loss = tf.losses.MeanSquaredError(target, prediction)
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            epoch_loss_agg .append(loss)
