# Imports

In [None]:
import logging
import warnings

import matplotlib.animation as animation
import matplotlib.style as style
import numpy as np
import tensorflow as tf
from IPython.display import HTML
from matplotlib import pyplot as plt
from tensorflow.keras import layers

warnings.filterwarnings("ignore")
logging.getLogger("tensorflow").setLevel(logging.ERROR)

In [None]:
print("TF version:", tf.__version__)

# Generate random images

In [None]:
def generate_movies(n_samples=1200, n_frames=15):
    row = 80
    col = 80
    noisy_movies = np.zeros((n_samples, n_frames, row, col, 3), dtype=np.float)
    shifted_movies = np.zeros((n_samples, n_frames, row, col, 3), dtype=np.float)

    epsilon = np.random.random() / 1000

    Y = np.zeros((n_samples, 4), dtype=np.float)

    for i in range(n_samples):
        # Add 3 to 7 moving squares
        n = np.random.randint(3, 8)

        # Direction of motion (All squares are moving in the same direction)
        directionx = np.random.choice([-1, 0, 1])
        directiony = np.random.choice([-1, 0, 1])

        # Make the target smooth
        # Section 7 of https://arxiv.org/pdf/1512.00567.pdf
        Y[i, ::] = epsilon
        if directionx < 0:
            Y[i, 0] = 1
        if directionx > 0:
            Y[i, 1] = 1
        if directiony < 0:
            Y[i, 2] = 1
        if directiony > 0:
            Y[i, 3] = 1
        Y[i] = Y[i] / np.linalg.norm(Y[i])

        for j in range(n):
            # Initial position
            xstart = np.random.randint(20, 60)
            ystart = np.random.randint(20, 60)

            directionx *= np.random.randint(1, 3)
            directiony *= np.random.randint(1, 3)

            # Size of the square
            w = np.random.randint(2, 4)

            for t in range(n_frames):
                x_shift = xstart + directionx * t
                y_shift = ystart + directiony * t
                noisy_movies[
                    i, t, x_shift - w : x_shift + w, y_shift - w : y_shift + w, ::
                ] += 1

                # Make it more robust by adding noise.
                # The idea is that if during inference,
                # the value of the pixel is not exactly one,
                # we need to train the network to be robust and still
                # consider it as a pixel belonging to a square.
                if np.random.randint(0, 2):
                    noise_f = (-1) ** np.random.randint(0, 2)
                    noisy_movies[
                        i,
                        t,
                        x_shift - w - 1 : x_shift + w + 1,
                        y_shift - w - 1 : y_shift + w + 1,
                        ::,
                    ] += (
                        noise_f * 0.1
                    )

                # Shift the ground truth by 1
                x_shift = xstart + directionx * (t + 1)
                y_shift = ystart + directiony * (t + 1)
                shifted_movies[
                    i, t, x_shift - w : x_shift + w, y_shift - w : y_shift + w, ::
                ] += 1

    # Cut to a 40x40 window
    noisy_movies = noisy_movies[::, ::, 20:60, 20:60, ::]
    shifted_movies = shifted_movies[::, ::, 20:60, 20:60, ::]
    noisy_movies = np.clip(
        noisy_movies, 0.0, 1.0
    )  # noisy_movies[noisy_movies >= 1] = 255
    shifted_movies = np.clip(shifted_movies, 0.0, 1.0)
    return noisy_movies, shifted_movies, np.array(Y)

In [None]:
noisy_movies, shifted_movies, Y = generate_movies(n_samples=1200)

In [None]:
print(noisy_movies.shape, Y.shape)

In [None]:
%matplotlib inline


def get_label(y):
    label = []

    if y[0] > 0.6:
        label.append("UP")
    if y[1] > 0.6:
        label.append("DOWN")
    if y[2] > 0.6:
        label.append("LEFT")
    if y[3] > 0.6:
        label.append("RIGHT")

    if len(label) == 0:
        label.append("STILL")

    return " ".join(label)


def show_prediction(X, y, model, ax):

    X = np.expand_dims(X, axis=0)

    # Get movie info
    title = get_label(y)

    # Generate prediction
    prediction = model.predict(X)

    title += "vs " + get_label(prediction[0])

    # Dispaly image with prediction
    style.use("default")
    # plt.figure(figsize=(8,4))
    # plt.title(title, fontsize=9)

    frame = 0
    im = ax.imshow(movie[frame])

    # ax.close() # this is required to not display the generated image

    def init():
        im.set_data(movie[0, :, :, :])

    def animate(i):
        im.set_data(movie[i, :, :, :])
        return im

    anim = animation.FuncAnimation(
        ax, animate, init_func=init, frames=movie.shape[0], interval=50
    );

In [None]:
def display(nrows, ncols, X, Y, model=None):

    plt.xkcd()

    # See https://matplotlib.org/stable/tutorials/introductory/customizing.html

    plt.rcParams["xtick.bottom"] = False
    plt.rcParams["ytick.left"] = False
    plt.rcParams["xtick.labelbottom"] = False
    plt.rcParams["ytick.labelleft"] = False

    # fig = plt.figure()
    # fig.suptitle(label)

    maxi = X.max()
    mini = X.min()

    fig, axs = plt.subplots(nrows=nrows, ncols=ncols)

    axs = np.array(axs)

    movies = []

    if model != None:
        Y_hat = model.predict(X[::, ::, ::, ::, ::])

    for i, ax in enumerate(axs.reshape(-1)):
        im = ax.imshow(X[i, 0, ::, ::, ::])
        label = get_label(Y[i])
        if model != None:
            label += (
                " vs "
                + get_label(Y_hat[i])
                + "\n[{:.2f},{:.2f},{:.2f},{:.2f}]".format(
                    Y_hat[i, 0], Y_hat[i, 1], Y_hat[i, 2], Y_hat[i, 3]
                )
            )
        ax.set_title(label, fontsize=9)
        movies.append(im)

    def animate(frame):
        for i, ax in enumerate(axs.reshape(-1)):
            movies[i].set_data(X[i, frame, ::, ::, ::])

        return movies

    anim = animation.FuncAnimation(
        fig, animate, blit=True, frames=X.shape[1], repeat=True, interval=100
    )

    plt.tight_layout()

    return anim

# Create Model

In [None]:
seq_len = 15
img_height = 40
img_width = 40

classes = ["up", "down", "left", "right"]

model = tf.keras.Sequential()
model.add(
    layers.ConvLSTM2D(
        filters=64,
        kernel_size=(3, 3),
        return_sequences=False,
        data_format="channels_last",
        input_shape=noisy_movies.shape[1:],
    )
)
model.add(layers.Dropout(0.2))
model.add(layers.Flatten())
model.add(layers.Dense(256, activation="relu"))
model.add(layers.Dropout(0.3))
model.add(layers.Dense(4, activation="sigmoid"))

In [None]:
model.summary()

In [None]:
# from tensorflow.keras.utils import plot_model
# plot_model(model)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    noisy_movies, Y, test_size=0.20, shuffle=True, random_state=0
)

In [None]:
nrows = 3
ncols = 2

np.random.seed(42)
ind = np.random.randint(0, high=X_train.shape[0], size=nrows * ncols)

anim = display(nrows, ncols, X_train[ind, ::, ::, ::, ::], y_train[ind, ::], model=None)

In [None]:
HTML(anim.to_html5_video())

In [None]:
@tf.function
def macro_soft_f1(y, y_hat):
    """Compute the macro soft F1-score as a cost (average 1 - soft-F1 across all labels).
    Use probability values instead of binary predictions.

    Args:
        y (int32 Tensor): targets array of shape (BATCH_SIZE, N_LABELS)
        y_hat (float32 Tensor): probability matrix from forward propagation of shape (BATCH_SIZE, N_LABELS)

    Returns:
        cost (scalar Tensor): value of the cost function for the batch
    """
    y = tf.cast(y, tf.float32)
    y_hat = tf.cast(y_hat, tf.float32)
    tp = tf.reduce_sum(y_hat * y, axis=0)
    fp = tf.reduce_sum(y_hat * (1 - y), axis=0)
    fn = tf.reduce_sum((1 - y_hat) * y, axis=0)
    soft_f1 = 2 * tp / (2 * tp + fn + fp + 1e-16)
    cost = 1 - soft_f1  # reduce 1 - soft-f1 in order to increase soft-f1
    macro_cost = tf.reduce_mean(cost)  # average on all labels
    return macro_cost  # Define

In [None]:
@tf.function
def macro_f1(y, y_hat, thresh=0.6):
    """Compute the macro F1-score on a batch of observations (average F1 across labels)

    Args:
        y (int32 Tensor): labels array of shape (BATCH_SIZE, N_LABELS)
        y_hat (float32 Tensor): probability matrix from forward propagation of shape (BATCH_SIZE, N_LABELS)
        thresh: probability value above which we predict positive

    Returns:
        macro_f1 (scalar Tensor): value of macro F1 for the batch
    """
    y_pred = tf.cast(tf.greater(y_hat, thresh), tf.float32)
    tp = tf.cast(tf.math.count_nonzero(y_pred * y, axis=0), tf.float32)
    fp = tf.cast(tf.math.count_nonzero(y_pred * (1 - y), axis=0), tf.float32)
    fn = tf.cast(tf.math.count_nonzero((1 - y_pred) * y, axis=0), tf.float32)
    f1 = 2 * tp / (2 * tp + fn + fp + 1e-16)
    macro_f1 = tf.reduce_mean(f1)
    return macro_f1

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam

opt = Adam(lr=0.001)
model.compile(loss=macro_soft_f1, optimizer=opt, metrics=[macro_f1])

earlystop = EarlyStopping(patience=7)
callbacks = [earlystop]

history = model.fit(
    x=X_train,
    y=y_train,
    epochs=40,
    batch_size=8,
    shuffle=True,
    validation_split=0.2,
    callbacks=callbacks,
)

In [None]:
def learning_curves(history):
    """Plot the learning curves of loss and macro f1 score
    for the training and validation datasets.

    Args:
        history: history callback of fitting a tensorflow keras model
    """

    loss = history.history["loss"]
    val_loss = history.history["val_loss"]

    macro_f1 = history.history["macro_f1"]
    val_macro_f1 = history.history["val_macro_f1"]

    epochs = len(loss)

    style.use("bmh")
    plt.figure(figsize=(8, 8))

    plt.rcParams["xtick.bottom"] = True
    plt.rcParams["ytick.left"] = True
    plt.rcParams["xtick.labelbottom"] = True
    plt.rcParams["ytick.labelleft"] = True

    plt.subplot(2, 1, 1)
    plt.plot(range(1, epochs + 1), loss, label="Training Loss")
    plt.plot(range(1, epochs + 1), val_loss, label="Validation Loss")
    plt.legend(loc="upper right")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss")

    plt.subplot(2, 1, 2)
    plt.plot(range(1, epochs + 1), macro_f1, label="Training Macro F1-score")
    plt.plot(range(1, epochs + 1), val_macro_f1, label="Validation Macro F1-score")
    plt.legend(loc="lower right")
    plt.ylabel("Macro F1-score")
    plt.title("Training and Validation Macro F1-score")
    plt.xlabel("epoch")

    plt.show()

    return loss, val_loss, macro_f1, val_macro_f1

In [None]:
learning_curves(history)

# See predictions

In [None]:
nrows = 3
ncols = 2

ind = np.random.randint(0, high=X_test.shape[0], size=nrows * ncols)

anim_test = display(
    nrows, ncols, X_test[ind, ::, ::, ::, ::], y_test[ind, ::], model=model
)

In [None]:
HTML(anim_test.to_html5_video())