## ARC-AGI

Ferdinand Bhavsar

PhD student, Mines Paris

### Imports

In [None]:
from tensorflow import keras
import pandas as pd
import numpy as np

import random
import math
from tqdm.notebook import trange, tqdm

import matplotlib.pyplot as plt
from matplotlib import colors


from scipy.stats import kde
from sklearn.metrics.pairwise import euclidean_distances

import tensorflow as tf
from keras.utils import to_categorical
from tensorflow.keras import layers, losses
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model

### Utilities

Get color map (took from some random code I had lying around, so the colors are not the ones from ARC-AGI

In [None]:
def get_color_map(number_of_categories=4):
    """
    Get the matplotlib colormap and norm for images visualisation
    Args:
        number_of_categories: number of facies in the slice

    Returns: cmap, norm

    """
    if number_of_categories == 4:
        cmap = colors.ListedColormap(["#FF8000", "#CBCB33", "#9898E5", "#66CB33"])
        bounds = [-0.1, 0.9, 1.9, 2.9, 3.9]
    elif number_of_categories == 5:
        cmap = colors.ListedColormap(["#000000", "#5387AD", "#7DD57E", "#F1E33E", "#C70000"])
        bounds = [-0.1, 0.9, 1.9, 2.9, 3.9, 4.9]
    else:  # 9
        cmap = colors.ListedColormap(
            ["#000000", "#294255", "#5387AD", "#6DB6B1", "#7DD57E", "#B5DF5D", "#F1E33E", "#F77420", "#C70000"])
        bounds = [-0.1, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9, 6.9, 7.9, 8.9]

    norm = colors.BoundaryNorm(bounds, cmap.N)

    return cmap, norm

cmap, norm = get_color_map(number_of_categories=9)

### Dataset Loading

In [None]:
import json

def load_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

training_challenges = load_json('./arc-agi_training_challenges.json')
training_solutions = load_json('./arc-agi_training_solutions.json')
evaluation_challenges = load_json('./arc-agi_evaluation_challenges.json')

print("Data loaded successfully.")
print(f"Training tasks: {len(training_challenges)}")
print(f"Evaluation tasks: {len(evaluation_challenges)}")

FileNotFoundError: [Errno 2] No such file or directory: './arc-agi_training_challenges.json'

In [None]:
def pad_to_shape(arr, target_shape=(30,30,1)):
    """
    Padding the inputs to a single shape, this will make it easier to manipulate
    """
    paddings = [(0, target_shape[i] - arr.shape[i]) for i in range(len(arr.shape))]

    padded_array = tf.pad(
        arr, paddings, mode='CONSTANT', constant_values=0
    )

    return padded_array

Preprocess the challenge data (I'm not touching the indentation, it was a nightmare of using jupyter AND colab for some tests)

In [None]:
def preprocess_challenge_data(challenge_data, solution_data):
  challenge_ids = []

  # tuples (test_input, test_output) that are both inputs to solution propositioner
  challenge_propositioner_inputs = []

  # solver trainining input (might be useful)
  train_solver_inputs = []
  train_solver_outputs = []

  #solver test inputs (what the solver will train, getting also a solution as input)
  test_solver_inputs = []
  test_solver_outputs = []

  for id, challenge in challenge_data.items():
    challenge_ids.append(id)

    # TRAIN
    current_challenge_propositioner_inputs = []
    current_train_solver_inputs = []
    current_train_solver_outputs = []

    for train in challenge['train']:
      # input
      array = np.array(train['input'])

      if array.shape[-1] == 1:
        # Necessary or to_categorical will mess up the last dim
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      input_cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)
      current_train_solver_inputs.append(input_cat_tensor)

      # output
      array = np.array(train['output'])

      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      output_cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)
      current_train_solver_outputs.append(output_cat_tensor)

      current_challenge_propositioner_inputs.append((input_cat_tensor, output_cat_tensor))

    challenge_propositioner_inputs.append(current_challenge_propositioner_inputs)
    train_solver_inputs.append(current_train_solver_inputs)
    train_solver_outputs.append(current_train_solver_outputs)

    # test
    current_test_solver_inputs = []
    current_test_solver_outputs = []
    for i, test in enumerate(challenge['test']):
      # TEST INPUTS
      array = np.array(test['input'])

      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      input_cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)
      current_test_solver_inputs.append(input_cat_tensor)

      # TEST OUTPUTS
      array = np.array(solution_data[id][i])

      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      output_cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      current_test_solver_outputs.append(output_cat_tensor)

    current_test_solver_inputs = np.array(current_test_solver_inputs)
    test_solver_inputs.append(current_test_solver_inputs)

    current_test_solver_outputs = np.array(current_test_solver_outputs)
    test_solver_outputs.append(current_test_solver_outputs)

  return challenge_propositioner_inputs, train_solver_inputs, train_solver_outputs, test_solver_inputs, test_solver_outputs
      #break

In [None]:
challenge_propositioner_inputs, train_solver_inputs, train_solver_outputs, test_solver_inputs, test_solver_outputs= preprocess_challenge_data(training_challenges, training_solutions)
print(len(challenge_propositioner_inputs), len(train_solver_inputs), len(train_solver_outputs), len(test_solver_inputs), len(test_solver_outputs))

### Plotting some padded data



In [None]:
plt.subplot(1, 2, 1)
plt.title('input')
plt.imshow(np.argmax(challenge_propositioner_inputs[0][0][0], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title('output')
plt.imshow(np.argmax(challenge_propositioner_inputs[0][0][1], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.show()

plt.subplot(1, 2, 1)
plt.title('input')
plt.imshow(np.argmax(train_solver_inputs[0][1], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title('output')
plt.imshow(np.argmax(train_solver_outputs[0][1], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.show()

plt.subplot(1, 2, 1)
plt.title('input')
plt.imshow(np.argmax(test_solver_inputs[0][0], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.subplot(1, 2, 2)
plt.title('output')
plt.imshow(np.argmax(test_solver_outputs[0][0], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')
plt.show()

In [None]:
def crop_indices(arr):
    if len(arr.shape) > 2:
      arr = arr.reshape((arr.shape[0], arr.shape[1]))
    non_zero_indices = np.argwhere(arr)

    if non_zero_indices.size == 0:
        return np.array([[0, 30], [0, 30]])

    min_indices = list(non_zero_indices.min(axis=0))


    max_indices = list(non_zero_indices.max(axis=0) + 1)

    res = np.array([min_indices, max_indices])

    return res

def encoder_preprocess_challenge_data(data):
  encoder_training_data = []

  for id, challenge in data.items():

    for train in challenge['train']:
      # input
      array = np.array(train['input'])
      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      encoder_training_data.append(cat_tensor)

      # outpu
      array = np.array(train['output'])
      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      encoder_training_data.append(cat_tensor)


  return np.array(encoder_training_data)
      #break

In [None]:
encoder_training_data = encoder_preprocess_challenge_data(training_challenges)
print(len(encoder_training_data))

### Data AutoEncoder

Doing an autoencoder seems like a good idea, after all it's easier working in a latent space. Of course, it probably will not be perfect at all. Maybe in the future, I can use a ppre-trained encoder ?

My design choice is not to do dimensionality reduction, as I feel it would take unecessary energy for something that I want simply encoded in a continuous space.

In [None]:
def compute_block(y, filters, activation, kernel_size=(3, 3), strides=(1, 1)):
    """
    The simple computatio nal bloc for both models
    """
    #y = keras.layers.BatchNormalization()(y)
    y = layers.Conv2D(
        filters, kernel_size, strides=strides, padding="same", kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4), bias_regularizer=regularizers.L2(1e-4),)(y)
    #y = layers.Dropout(0.3)(y)
    if activation is not None:
      y = activation(y)

    return y

In [None]:
def get_encoder_model(output_channels, kernel_size=(1, 1), layers_features=None):
    if layers_features is None:
        layers_features = [32, 64, 128]

    input = layers.Input(shape=(None, None, 10), name="input")

    y = compute_block(input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    for i, nb_features in enumerate(layers_features[1:]):
        y = compute_block(y, nb_features, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    # Final Bloc -> img
    output = compute_block(y, output_channels, None, kernel_size=(1, 1))

    encoder = keras.models.Model(input, output, name="encoder")
    return encoder

In [None]:
def get_decoder_model(output_channels, kernel_size=(1, 1), layers_features=None,
                        final_activation=tf.keras.activations.softmax):
    if layers_features is None:
        layers_features = [128, 64, 10]

    input = layers.Input(shape=(None, None, 128), name="input")

    y = compute_block(input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    for i, nb_features in enumerate(layers_features[1:]):
        y = compute_block(y, nb_features, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    # Final Bloc -> img
    output = compute_block(y, output_channels, final_activation, kernel_size=(1, 1))

    decoder = keras.models.Model(input, output, name="decoder")
    return decoder

In [None]:
class AutoEncoder(keras.Model):
    def __init__(self, encoder, decoder):
        super(AutoEncoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    @tf.function
    def call(self, data):
        z = encoder(data)
        pred_x = decoder(z)
        return pred_x

In [None]:
ae_latent_size = 128 # Should be enough, I'm not doing dimensionality reduction

convolution_nb_per_layers = [128, 16]
decoder = get_decoder_model(output_channels=10, layers_features=convolution_nb_per_layers)

convolution_nb_per_layers = [16, 128,]
encoder = get_encoder_model(output_channels=ae_latent_size, layers_features=convolution_nb_per_layers)

autoencoder = AutoEncoder(encoder, decoder)
autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4), loss=losses.CategoricalCrossentropy())
autoencoder.build([None, None, None, 10])

In [None]:
#encoder_checkpoint_path = "cp-encoder.weights.h5"

#autoencoder.load_weights(encoder_checkpoint_path)

#### Training

In [None]:
validation_split = 0.2

x_train = encoder_training_data[math.floor(len(encoder_training_data) * (1 - validation_split)):]
x_test = encoder_training_data[:math.floor(len(encoder_training_data) * (1 - validation_split))]

In [None]:
# 600 was enough before overfitting. I put regularization to avoid it, but don't have time to design a perfect AE
if True:
  epochs=600

  autoencoder.fit(x_train, x_train,
                  epochs=epochs,
                  shuffle=True,
                  validation_data=(x_test, x_test))

In [None]:
import os

!mkdir encoder_weights

encoder_checkpoint_path = "encoder_weights/cp-encoder-1k1.weights.h5"
encoder_checkpoint_dir = os.path.dirname(encoder_checkpoint_path)

autoencoder.save_weights(encoder_checkpoint_path)

!tar -czvf encoder_weights.tar.gz ./encoder_weights

autoencoder.save("autoencoder.keras")

#### Visualize some of AE data

In [None]:
def crop_padded_zeros(arr1, arr2):
    """
    Cropping two arrays to the same dimension. I do this to outputs, but this is only a temporary solution: some outputs size simply depend
    on the input size. I cannot be bothered to think about that now. TODO
    """
    non_zero_indices = np.argwhere(arr1)

    if non_zero_indices.size == 0:
        return np.array([])

    min_indices = non_zero_indices.min(axis=0)
    max_indices = non_zero_indices.max(axis=0) + 1

    cropped_arr1 = arr1[min_indices[0]:max_indices[0], min_indices[1]:max_indices[1], min_indices[2]:max_indices[2]]
    cropped_arr2 = arr2[min_indices[0]:max_indices[0], min_indices[1]:max_indices[1], min_indices[2]:max_indices[2]]

    return cropped_arr1, cropped_arr2

Testing if the autoencoder is satisfactory on its validation data

In [None]:
decoded_imgs = autoencoder(x_test, training=False).numpy()

In [None]:
for i in range(200):
  cropped_x, cropped_pred_x = crop_padded_zeros(x_test[i], decoded_imgs[i])
  plt.subplot(1, 2, 1)
  plt.title('input')
  plt.imshow(np.argmax(cropped_x, axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
  plt.axis('off')
  plt.subplot(1, 2, 2)
  plt.title('output')
  plt.imshow(np.argmax(cropped_pred_x, axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
  plt.axis('off')
  plt.show()

### Solution Auto-Encoder

This is the beginning of the solver, but it's actually quite simply another AE. The only difference is that this model:

-Has an encoder that receives two inputs: the examples (encoded)

-The decoder receives two inputs: the input, and the encoded solution

-During training, we train the decoder on the solution predicted by the encoder, but with different examples than the encoder

NB: At first I tried to do it with a VAE, hoping that learning a distribution would let me search a probability solution space (a bit like the Inverse Problem) btu I couldn't make it work...

#### Solver Encoder and Decoder models

In [None]:
def compute_block(y, filters, activation, kernel_size=(3, 3), strides=(1, 1), bn=False, dropout=True):
    """
    The compute bloc for the solver auto-encoder doesn't really change, except for some parameters.
    This was done for tests purposes.
    """
    if bn:
      y = keras.layers.BatchNormalization()(y)

    y = layers.Conv2D(
        filters, kernel_size, strides=strides, padding="same", kernel_regularizer=regularizers.L1L2(l1=1e-5, l2=1e-4), bias_regularizer=regularizers.L2(1e-4),)(y)
    if dropout:
        y = layers.Dropout(0.3)(y)
    if activation is not None:
      y = activation(y)

    return y


def get_proposition_model(output_channels, kernel_size=(3, 3), layers_features=None):
    if layers_features is None:
        layers_features = [512, 256, 256, 128, 128]

    # Two inputs, the examples
    train_input = layers.Input(shape=(30, 30, 128), name="train_input")
    train_output = layers.Input(shape=(30, 30, 128), name="train_output")

    y_input = compute_block(train_input, layers_features[0] // 2, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y_output = compute_block(train_output, layers_features[0] // 2, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y = tf.keras.layers.Concatenate(axis=-1)([y_input, y_output])

    y = compute_block(y, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    for i, nb_features in enumerate(layers_features[1:]):
        y = compute_block(y, nb_features, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    # Final Bloc -> img
    output = compute_block(y, output_channels, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=(3, 3))


    proposition_model = keras.models.Model([train_input, train_output], output, name="proposition_model")
    return proposition_model

def get_solver_model(input_channels, output_channels, kernel_size=(3, 3), layers_features=None):
    # nb_prop * channels_solutions (128) * channels_input (128) 163840
    if layers_features is None:
        layers_features = [128, 128, 256, 256, 512]

    encoded_solution = layers.Input(shape=(30, 30, input_channels), name="encoded_solution")
    encoded_input = layers.Input(shape=(30, 30, output_channels), name="encoded_input")

    y_solut = compute_block(encoded_solution, layers_features[0] // 2, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y_input = compute_block(encoded_input, layers_features[0] // 2, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y = tf.keras.layers.Concatenate(axis=-1)([y_solut, y_input])

    y = compute_block(y, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    for i, nb_features in enumerate(layers_features[1:]):
        y = compute_block(y, nb_features, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    # Final Bloc -> img
    output = compute_block(y, output_channels, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=(3, 3), dropout=False)

    solver_model = keras.models.Model([encoded_solution, encoded_input], output, name="solver_model")
    return solver_model

In [None]:
# Kernel initializer to use
def kernel_init(scale):
    scale = max(scale, 1e-10)
    return keras.initializers.VarianceScaling(
        scale, mode="fan_avg", distribution="uniform"
    )


class AttentionBlock(layers.Layer):
    """Applies self-attention.

    Args:
        units: Number of units in the dense layers
        groups: Number of groups to be used for GroupNormalization layer
    """

    def __init__(self, units, groups=8, name="AttentionBlock", **kwargs):
        self.units = units
        self.groups = groups
        super().__init__(**kwargs)

        self.norm = layers.GroupNormalization(groups=groups, name=name+"-Norm")
        self.query = layers.Dense(units, kernel_initializer=kernel_init(1.0), name=name+"-Query")
        self.key = layers.Dense(units, kernel_initializer=kernel_init(1.0), name=name+"-Key")
        self.value = layers.Dense(units, kernel_initializer=kernel_init(1.0), name=name+"-Value")
        self.proj = layers.Dense(units, kernel_initializer=kernel_init(0.0), name=name+"-Proj")

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        height = tf.shape(inputs)[1]
        width = tf.shape(inputs)[2]
        scale = tf.cast(self.units, tf.float32) ** (-0.5)

        inputs = self.norm(inputs)
        q = self.query(inputs)
        k = self.key(inputs)
        v = self.value(inputs)

        attn_score = tf.einsum("bhwc, bHWc->bhwHW", q, k) * scale
        attn_score = tf.reshape(attn_score, [batch_size, height, width, height * width])

        attn_score = tf.nn.softmax(attn_score, -1)
        attn_score = tf.reshape(attn_score, [batch_size, height, width, height, width])

        proj = tf.einsum("bhwHW,bHWc->bhwc", attn_score, v)
        proj = self.proj(proj)
        return inputs + proj


def ResidualBlock(width, groups=8, activation_fn=keras.activations.swish, dropout=True, name="ResBlock"):
    def apply(inputs):
        x, sol = inputs
        input_width = x.shape[3]

        if input_width == width:
            residual = x
        else:
            residual = layers.Conv2D(
                width, kernel_size=1, kernel_initializer=kernel_init(1.0), name=name+"-Conv0"
            )(x)

        sol = activation_fn(sol)
        sol = layers.Conv2D(
            width,
            kernel_size=(3, 3),
            padding="same",
        )(sol)

        x = activation_fn(x)
        x = layers.Conv2D(
            width, kernel_size=3, padding="same", kernel_initializer=kernel_init(1.0), name=name+"-Conv1"
        )(x)
        if dropout:
            x = layers.Dropout(0.3, name=name+"-Dropout1")(x)

        x = layers.Concatenate(axis=-1)([x, sol])
        x = activation_fn(x)
        x = layers.Conv2D(
            width, kernel_size=3, padding="same", kernel_initializer=kernel_init(0.0), name=name+"-ConvConcat"
        )(x)
        #x = layers.Add(name=name+"-Add1")([x, sol])
        x = activation_fn(x)

        x = layers.Conv2D(
            width, kernel_size=3, padding="same", kernel_initializer=kernel_init(0.0), name=name+"-Conv2"
        )(x)
        x = layers.Add(name=name+"-Add2")([x, residual])
        return x

    return apply


def DownSample(width, name="DownSample"):
    def apply(x):
        x = layers.Conv2D(
            width,
            kernel_size=3,
            strides=2,
            padding="same",
            kernel_initializer=kernel_init(1.0), name=name+"-Conv"
        )(x)
        return x

    return apply


def UpSample(width, interpolation="nearest", name="UpSample"):
    def apply(x):
        x = layers.UpSampling2D(size=2, interpolation=interpolation, name=name+"-UpSamp")(x)
        x = layers.Conv2D(
            width, kernel_size=3, padding="same", kernel_initializer=kernel_init(1.0), name=name+"-Conv"
        )(x)
        return x

    return apply

def get_solver_model(input_channels, output_channels,
                     widths, has_attention, upsampling,
                     kernel_size=(3, 3),
                     num_res_blocks=2,
                     norm_groups=8,
                     nb_channels=9,
                     interpolation="nearest",
                     activation_fn=keras.activations.swish,):

    if widths is None:
        widths = [128, 128, 256, 256]

    encoded_solution = layers.Input(shape=(30, 30, input_channels), name="encoded_solution")
    encoded_input = layers.Input(shape=(30, 30, output_channels), name="encoded_input")

    x = layers.Conv2D(
        widths[0],
        kernel_size=(3, 3),
        padding="same",
        kernel_initializer=kernel_init(1.0),
        name="InitialConv2D_X"
    )(encoded_solution)

    sol = layers.Conv2D(
        widths[0],
        kernel_size=(3, 3),
        padding="same",
        kernel_initializer=kernel_init(1.0),
        name="InitialConv2D_SOL"
    )(encoded_input)

    skips = [x]
    sol_save = sol

    # DownBlock
    print("DOWN")
    for i in range(len(widths)):
        print(i)
        for j in range(num_res_blocks):
            x = ResidualBlock(
                widths[i], groups=norm_groups, activation_fn=activation_fn, name="ResBlock-Down-{0}-{1}".format(i, j)
            )([x, sol])
            if has_attention[i]:
                x = AttentionBlock(widths[i], groups=norm_groups, name="AttentionBlock-Down-{0}-{1}".format(i, j))(x)
            skips.append(x)

        if upsampling[i]:
            x = DownSample(widths[i], name="DownSampleX-{0}".format(i))(x)
            sol = layers.Conv2D(widths[i], kernel_size=(3, 3), padding="same", strides=2)(sol_save) #DownSample(widths[i], name="DownSampleSol-{0}".format(i))(sol_save)
            skips.append(x)
    # MiddleBlock
    x = ResidualBlock(widths[-1], groups=norm_groups, activation_fn=activation_fn, dropout=False, name="MidResBlock1")(
        [x, sol]
    )
    x = AttentionBlock(widths[-1], groups=norm_groups, name="MidAttBlock")(x)
    x = ResidualBlock(widths[-1], groups=norm_groups, activation_fn=activation_fn, dropout=False, name="MidResBlock2")(
        [x, sol]
    )

    # UpBlock
    for i in reversed(range(len(widths))):
        print(i)
        for j in range(num_res_blocks):

            skip = skips.pop()
            x = layers.Concatenate(axis=-1, name="Up-Concat-{0}-{1}".format(i, j))([x, skip])
            x = ResidualBlock(
                widths[i], groups=norm_groups, activation_fn=activation_fn, name="ResBlock-Up-{0}-{1}".format(i, j)
            )([x, sol])
            if has_attention[i]:
                x = AttentionBlock(widths[i], groups=norm_groups, name="AttentionBlock-Up-{0}-{1}".format(i, j))(x)

            if upsampling[i] and j == 0:
                x = UpSample(widths[i], interpolation=interpolation, name="UpSampleX-{0}".format(i))(x)
                sol =  layers.Conv2DTranspose(widths[i], kernel_size=3,
                                              strides=2, activation=tf.keras.layers.LeakyReLU(alpha=0.2), padding='same')(sol)
                #sol_save #UpSample(widths[i], interpolation=interpolation, name="UpSampleSol-{0}".format(i))(sol_save)

    # End block
    x = layers.Conv2D(output_channels, (3, 3), padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.2))(x)
    x = layers.Conv2D(output_channels, (3, 3), padding="same", activation=tf.keras.layers.LeakyReLU(alpha=0.2), name="FinalConv2D")(x)
    #x = tf.cast(x, tf.float64)
    return keras.Model([encoded_solution, encoded_input], x, name="unet")

#### Data for solver

In [None]:
def solver_encoder_preprocess_challenge_data(data):
  solver_encoder_training_data_input = []
  solver_encoder_training_data_output = []
  solver_encoder_training_data_ids = []

  for id, challenge in data.items():

    for train in challenge['train']:
      # input
      solver_encoder_training_data_ids.append(id)
      array = np.array(train['input'])
      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      solver_encoder_training_data_input.append(cat_tensor)

      # outpu
      array = np.array(train['output'])
      if array.shape[-1] == 1:
        array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      solver_encoder_training_data_output.append(cat_tensor)
  encoded_solver_encoder_training_data_input = autoencoder.encoder(np.array(solver_encoder_training_data_input))
  encoded_solver_encoder_training_data_output = autoencoder.encoder(np.array(solver_encoder_training_data_output))
  return encoded_solver_encoder_training_data_input, encoded_solver_encoder_training_data_output, tf.squeeze(np.array(solver_encoder_training_data_ids))
      #break

In [None]:
solver_encoder_training_data_input, solver_encoder_training_data_output, solver_encoder_training_data_ids = solver_encoder_preprocess_challenge_data(training_challenges)

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, solver_encoder_training_data_ids,
                 solver_encoder_training_data_input,
                 solver_encoder_training_data_output,
                 batch_size=32,
                 input_dim=(30, 30, 256),
                 output_dim=(30, 30, 128),
                 shuffle=True):
        self.solver_encoder_training_data_ids = solver_encoder_training_data_ids.numpy()
        self.solver_encoder_training_data_input = solver_encoder_training_data_input.numpy()
        self.solver_encoder_training_data_output = solver_encoder_training_data_output.numpy()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def get_random_challenge_output(self, ids):
        random_indices = []

        # Iterate over each id in the ids_list
        for id_str in ids:
            # Find all indices where the id_str is located in the ids array
            indices = tf.squeeze(tf.where(tf.equal(self.solver_encoder_training_data_ids, id_str)))

            # Randomly select one index if there are multiple
            if tf.size(indices) > 0:
                random_index = tf.random.shuffle(indices)
                random_indices.append(random_index[0])

        return tf.gather(self.solver_encoder_training_data_input, random_indices), tf.gather(self.solver_encoder_training_data_output, random_indices)

    def __len__(self):
        return int(np.floor(len(self.solver_encoder_training_data_input) / self.batch_size))

    def on_epoch_end(self):

        if self.shuffle:
            p = np.random.permutation(len(self.solver_encoder_training_data_ids))
            self.solver_encoder_training_data_ids = tf.gather(solver_encoder_training_data_ids, p)
            self.solver_encoder_training_data_input = tf.gather(solver_encoder_training_data_input, p)
            self.solver_encoder_training_data_output = tf.gather(solver_encoder_training_data_output, p)
        #if self.shuffle:
        #    np.random.shuffle(self.data)

    def __getitem__(self, index):
        ids = self.solver_encoder_training_data_ids[index * self.batch_size:(index + 1) * self.batch_size]
        input_input = self.solver_encoder_training_data_input[index * self.batch_size:(index + 1) * self.batch_size]
        input_output = self.solver_encoder_training_data_output[index * self.batch_size:(index + 1) * self.batch_size]

        random_output_rate = np.random.rand()
        if random_output_rate > 0.4:
            output_input, output_output = self.get_random_challenge_output(ids)
        else:
            output_input = self.solver_encoder_training_data_input[index * self.batch_size:(index + 1) * self.batch_size]
            output_output = self.solver_encoder_training_data_output[index * self.batch_size:(index + 1) * self.batch_size]

        return (input_input, input_output), (output_input, output_output)

In [None]:
training_datagenerator = DataGenerator(solver_encoder_training_data_ids[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))],
                                       solver_encoder_training_data_input[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))],
                                       solver_encoder_training_data_output[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))])

validation_generator = DataGenerator(solver_encoder_training_data_ids[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):],
                                       solver_encoder_training_data_input[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):],
                                       solver_encoder_training_data_output[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):])

#### Solver AE model

In [None]:
class SolverAutoEncoder(keras.Model):
    def __init__(self, proposition_model, solver_model):
        super(SolverAutoEncoder, self).__init__()
        self.proposition_model = proposition_model
        self.solver_model = solver_model
        self.teacher_prop = 0.7

        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )

        #self.solver_encoder_training_data_ids = tf.squeeze(solver_encoder_training_data_ids)
        #self.solver_encoder_training_data_output = tf.squeeze(solver_encoder_training_data_output)

    @tf.function
    def call(self, x):
        encoded_train_input, encoded_train_output, _ = x
        solution = self.proposition_model((encoded_train_input, encoded_train_output), training=False)

        # Apply solution
        solved_encoded_train_output = self.solver_model((solution, encoded_train_input), training=False)
        return solved_encoded_train_output

    @tf.function
    def get_random_challenge_output(self, ids):
        random_indices = []

        # Iterate over each id in the ids_list
        for id_str in ids:
            # Find all indices where the id_str is located in the ids array
            indices = tf.squeeze(tf.where(tf.equal(self.solver_encoder_training_data_ids, id_str)))

            # Randomly select one index if there are multiple
            if tf.size(indices) > 0:
                random_index = tf.random.shuffle(indices)
                random_indices.append(random_index[0])

        return tf.gather(self.solver_encoder_training_data_output, random_indices)

    @property
    def metrics(self):
        return [
            self.reconstruction_loss_tracker,
        ]

    def train_step(self, data):
        inputs, outputs = data
        encoded_train_input, encoded_train_output = inputs
        encoded_test_input, encoded_test_output = outputs

        with tf.GradientTape() as tape:
            solution = self.proposition_model((encoded_train_input, encoded_train_output))

            solved_encoded_train_output = self.solver_model((solution, encoded_test_input))

            reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

        grads = tape.gradient(reconstruction_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        return {
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
        }

    def test_step(self, data):
        inputs, outputs = data
        encoded_train_input, encoded_train_output = inputs
        encoded_test_input, encoded_test_output = outputs


        solution = self.proposition_model((encoded_train_input, encoded_train_output), training=False)

        solved_encoded_train_output = self.solver_model((solution, encoded_test_input), training=False)


        reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        return {
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
        }

In [None]:
solution_latent_size = 256
solutioner = get_proposition_model(solution_latent_size, kernel_size=(3, 3))
solver = get_solver_model(solution_latent_size, ae_latent_size,
                          widths = [256, 256, 128, 128,],
                          has_attention= [False, False, True, True,],
                          upsampling=[True, False, False, False],
                          kernel_size=(3, 3))
solver_autoencoder = SolverAutoEncoder(solutioner, solver)

solver_autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), loss=losses.MeanSquaredError())

In [None]:
test = solver_autoencoder((tf.expand_dims(solver_encoder_training_data_input[0], axis=0),
                           tf.expand_dims(solver_encoder_training_data_output[0], axis=0),
                           tf.expand_dims(solver_encoder_training_data_output[0], axis=0)))

In [None]:
#solver_autoencoder_checkpoint_path = "cp-solver.weights.h5"

#solver_autoencoder.load_weights(solver_autoencoder_checkpoint_path)

In [None]:
solver_autoencoder.optimizer.learning_rate = 1e-6

In [None]:
if True:
  epochs=1000

  solver_autoencoder.fit(training_datagenerator,
                        epochs=epochs,
                        batch_size=32,
                        shuffle=True,
                        validation_data=validation_generator
                        )

In [None]:
0.1618
0.1601
0.1578

In [None]:
import os

!mkdir solver_weights

solver_checkpoint_path = "solver_weights/cp-solver.weights.h5"
solver_checkpoint_dir = os.path.dirname(solver_checkpoint_path)

solver_autoencoder.save_weights(solver_checkpoint_path)

!tar -czvf solver_weights.tar.gz ./solver_weights

solver_autoencoder.save("autosolver.keras")

In [None]:
for i in range(5):
  output_test_solver = tf.expand_dims(challenge_propositioner_inputs[i][1][1], axis=0)
  input_test_solver = tf.expand_dims(challenge_propositioner_inputs[i][1][0], axis=0)

  encoded_input_test_solver = autoencoder.encoder(input_test_solver, training=False).numpy()
  encoded_output_test_solver = autoencoder.encoder(output_test_solver, training=False).numpy()

  solved_encoded_output = solver_autoencoder((encoded_input_test_solver, encoded_output_test_solver, None))
  solved_output = autoencoder.decoder(solved_encoded_output, training=False).numpy()

  cropped_input, _ = crop_padded_zeros(input_test_solver[0], input_test_solver[0])
  cropped_output, cropped_pred_output = crop_padded_zeros(output_test_solver[0], solved_output[0])
  plt.subplot(1, 3, 1)
  plt.title('input')
  plt.imshow(np.argmax(cropped_input, axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
  plt.axis('off')
  plt.subplot(1, 3, 2)
  plt.title('output')
  plt.imshow(np.argmax(cropped_output, axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
  plt.axis('off')
  plt.subplot(1, 3, 3)
  plt.title('output')
  plt.imshow(np.argmax(cropped_pred_output, axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
  plt.axis('off')
  plt.show()

### OTHER TESTS

In [None]:
def pad_to_shape(arr, target_shape=(30,30,1)):
    """
    Padding the inputs to a single shape, this will make it easier to manipulate
    """
    paddings = [(0, target_shape[i] - arr.shape[i]) for i in range(len(arr.shape))]

    padded_array = tf.pad(
        arr, paddings, mode='CONSTANT', constant_values=0
    )

    return padded_array

In [None]:
def solver_encoder_preprocess_challenge_data(data):
  solver_encoder_training_data_input = []
  solver_encoder_training_data_output = []
  solver_encoder_training_data_ids = []

  for id, challenge in data.items():

    for train in challenge['train']:
      # input
      solver_encoder_training_data_ids.append(id)
      array = np.array(train['input'])
      #if array.shape[-1] == 1:
      #  array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      solver_encoder_training_data_input.append(cat_tensor)

      # outpu
      array = np.array(train['output'])
      #if array.shape[-1] == 1:
      #  array = np.expand_dims(array, axis=-1)
      array = pad_to_shape(array)
      cat_tensor = tf.keras.utils.to_categorical(array, num_classes=10)

      solver_encoder_training_data_output.append(cat_tensor)
  encoded_solver_encoder_training_data_input = np.array(solver_encoder_training_data_input)
  encoded_solver_encoder_training_data_output = np.array(solver_encoder_training_data_output)
  return encoded_solver_encoder_training_data_input, encoded_solver_encoder_training_data_output, tf.squeeze(np.array(solver_encoder_training_data_ids))
      #break

In [None]:
solver_encoder_training_data_input, solver_encoder_training_data_output, solver_encoder_training_data_ids = solver_encoder_preprocess_challenge_data(training_challenges)

In [None]:
solver_encoder_training_data_output.shape

In [None]:
plt.title('input')
plt.imshow(np.argmax(solver_encoder_training_data_output[100], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, solver_encoder_training_data_ids,
                 solver_encoder_training_data_input,
                 solver_encoder_training_data_output,
                 batch_size=32,
                 input_dim=(30, 30, 256),
                 output_dim=(30, 30, 128),
                 shuffle=True):
        self.solver_encoder_training_data_ids = solver_encoder_training_data_ids
        self.solver_encoder_training_data_input = solver_encoder_training_data_input
        self.solver_encoder_training_data_output = solver_encoder_training_data_output
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def get_random_challenge_output(self, ids):
        random_indices = []

        # Iterate over each id in the ids_list
        for id_str in ids:
            # Find all indices where the id_str is located in the ids array
            indices = tf.squeeze(tf.where(tf.equal(self.solver_encoder_training_data_ids, id_str)))

            # Randomly select one index if there are multiple
            if tf.size(indices) > 0:
                random_index = tf.random.shuffle(indices)
                random_indices.append(random_index[0])

        return tf.gather(self.solver_encoder_training_data_input, random_indices), tf.gather(self.solver_encoder_training_data_output, random_indices)

    def __len__(self):
        return int(np.floor(len(self.solver_encoder_training_data_input) / self.batch_size))

    def on_epoch_end(self):

        if self.shuffle:
            p = np.random.permutation(len(self.solver_encoder_training_data_ids))
            self.solver_encoder_training_data_ids = tf.gather(solver_encoder_training_data_ids, p)
            self.solver_encoder_training_data_input = tf.gather(solver_encoder_training_data_input, p)
            self.solver_encoder_training_data_output = tf.gather(solver_encoder_training_data_output, p)
        #if self.shuffle:
        #    np.random.shuffle(self.data)

    def __getitem__(self, index):
        ids = self.solver_encoder_training_data_ids[index * self.batch_size:(index + 1) * self.batch_size]
        input_input = self.solver_encoder_training_data_input[index * self.batch_size:(index + 1) * self.batch_size]
        input_output = self.solver_encoder_training_data_output[index * self.batch_size:(index + 1) * self.batch_size]

        random_output_rate = np.random.rand()
        if random_output_rate > 0.4:
            output_input, output_output = self.get_random_challenge_output(ids)
        else:
            output_input = self.solver_encoder_training_data_input[index * self.batch_size:(index + 1) * self.batch_size]
            output_output = self.solver_encoder_training_data_output[index * self.batch_size:(index + 1) * self.batch_size]

        return (input_input, input_output), (output_input, output_output)

In [None]:
training_datagenerator = DataGenerator(solver_encoder_training_data_ids[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))],
                                       solver_encoder_training_data_input[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))],
                                       solver_encoder_training_data_output[:int(np.floor(len(solver_encoder_training_data_ids) * 0.80))])

validation_generator = DataGenerator(solver_encoder_training_data_ids[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):],
                                       solver_encoder_training_data_input[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):],
                                       solver_encoder_training_data_output[int(np.ceil(len(solver_encoder_training_data_ids) * 0.80)):])

In [None]:
class SolverFinalAutoEncoder(keras.Model):
    def __init__(self, proposition_model, solver_model, encoder, decoder):
        super(SolverFinalAutoEncoder, self).__init__()
        self.proposition_model = proposition_model
        self.solver_model = solver_model
        self.encoder = encoder
        self.decoder = decoder
        self.teacher_prop = 0.7

        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss",
        )

        self.encoding_loss_tracker = keras.metrics.Mean(
            name="encoding_loss",
        )

        #self.solver_encoder_training_data_ids = tf.squeeze(solver_encoder_training_data_ids)
        #self.solver_encoder_training_data_output = tf.squeeze(solver_encoder_training_data_output)

    @tf.function
    def call(self, x):
        train_input, train_output, _ = x
        encoded_train_input = self.encoder(train_input)
        encoded_train_output = self.encoder(train_output)
        solution = self.proposition_model((encoded_train_input, encoded_train_output), training=False)

        # Apply solution
        solved_encoded_train_output = self.solver_model((solution, encoded_train_input), training=False)
        return solved_encoded_train_output

    @tf.function
    def get_random_challenge_output(self, ids):
        random_indices = []

        # Iterate over each id in the ids_list
        for id_str in ids:
            # Find all indices where the id_str is located in the ids array
            indices = tf.squeeze(tf.where(tf.equal(self.solver_encoder_training_data_ids, id_str)))

            # Randomly select one index if there are multiple
            if tf.size(indices) > 0:
                random_index = tf.random.shuffle(indices)
                random_indices.append(random_index[0])

        return tf.gather(self.solver_encoder_training_data_output, random_indices)

    @property
    def metrics(self):
        return [
            self.reconstruction_loss_tracker,
            self.encoding_loss_tracker,
        ]

    def train_step(self, data):
        inputs, outputs = data
        train_input, train_output = inputs
        test_input, test_output = outputs
        with tf.GradientTape() as tape:
            encoded_train_input = self.encoder(train_input)
            encoded_train_output = self.encoder(train_output)
            encoded_test_input = self.encoder(test_input)
            encoded_test_output = self.encoder(test_output)
            solution = self.proposition_model((encoded_train_input, encoded_train_output))

            solved_encoded_train_output = self.solver_model((solution, encoded_test_input))

            reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

            decoded_train_input = self.decoder(encoded_train_input)
            decoded_train_output = self.decoder(encoded_train_output)

            encoding_loss = (keras.losses.CategoricalCrossentropy()(decoded_train_input, train_input) + keras.losses.CategoricalCrossentropy()(decoded_train_output, train_output))/2
            loss = 0.1*encoding_loss + reconstruction_loss
        grads = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # with tf.GradientTape() as tape:
        #    solution = self.proposition_model((encoded_train_input, encoded_train_output))

        #    solved_encoded_train_output = self.solver_model((solution, encoded_test_input))

        #    reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

        #grads = tape.gradient(reconstruction_loss, self.trainable_weights)
        #self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.encoding_loss_tracker.update_state(encoding_loss)
        return {
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "encoding_loss": self.encoding_loss_tracker.result(),
        }

    @property
    def metrics(self):
        return [
            self.reconstruction_loss_tracker,
            self.encoding_loss_tracker,
        ]

    def test_step(self, data):
        inputs, outputs = data
        train_input, train_output = inputs
        test_input, test_output = outputs
        encoded_train_input = self.encoder(train_input)
        encoded_train_output = self.encoder(train_output)
        encoded_test_input = self.encoder(test_input)
        encoded_test_output = self.encoder(test_output)
        solution = self.proposition_model((encoded_train_input, encoded_train_output))

        solved_encoded_train_output = self.solver_model((solution, encoded_test_input))

        reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

        decoded_train_input = self.decoder(encoded_train_input)
        decoded_train_output = self.decoder(encoded_train_output)

        encoding_loss = (keras.losses.CategoricalCrossentropy()(decoded_train_input, train_input) + keras.losses.CategoricalCrossentropy()(decoded_train_output, train_output))/2
        loss = 0.1*encoding_loss + reconstruction_loss

        # with tf.GradientTape() as tape:
        #    solution = self.proposition_model((encoded_train_input, encoded_train_output))

        #    solved_encoded_train_output = self.solver_model((solution, encoded_test_input))

        #    reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, encoded_test_output)

        #grads = tape.gradient(reconstruction_loss, self.trainable_weights)
        #self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.encoding_loss_tracker.update_state(encoding_loss)
        return {
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "encoding_loss": self.encoding_loss_tracker.result(),
        }

In [None]:
solver_final_autoencoder = SolverFinalAutoEncoder(solutioner, solver, autoencoder.encoder, autoencoder.decoder)

solver_final_autoencoder.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-5), loss=losses.MeanSquaredError())

In [None]:
test = solver_final_autoencoder((tf.expand_dims(solver_encoder_training_data_input[0], axis=0),
                           tf.expand_dims(solver_encoder_training_data_output[0], axis=0),
                           tf.expand_dims(solver_encoder_training_data_output[0], axis=0)))

In [None]:
decode_solved = solver_final_autoencoder.decoder(test)

In [None]:
plt.title('input')
plt.imshow(np.argmax(solver_encoder_training_data_output[0], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')

In [None]:
plt.title('input')
plt.imshow(np.argmax(decode_solved[0], axis=-1), interpolation='nearest', cmap=cmap, norm=norm)
plt.axis('off')

In [None]:
epochs=1000

solver_final_autoencoder.fit(training_datagenerator,
                       epochs=epochs,
                       batch_size=32,
                       shuffle=True,
                       validation_data=validation_generator
                       )

In [None]:
class SolverAutoEncoder(keras.Model):
    def __init__(self, proposition_model, solver_model):
        super(SolverAutoEncoder, self).__init__()
        self.proposition_model = proposition_model
        self.solver_model = solver_model
        #self.sampling_layer = Sampling()
        self.teacher_prop = 0.7
        #self.solver_encoder_training_data_ids = tf.squeeze(solver_encoder_training_data_ids)
        #self.solver_encoder_training_data_output = tf.squeeze(solver_encoder_training_data_output)
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @tf.function
    def call(self, x):
        ids, encoded_train_input, encoded_train_output = x
        z_mean, z_log_var = self.proposition_model((encoded_train_input, encoded_train_output))
        solution = self.sampling_layer((z_mean, z_log_var))

        # Apply solution
        solved_encoded_train_output = self.solver_model((solution, encoded_train_input))
        return solved_encoded_train_output

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            #self.kl_loss_tracker,
        ]

    def train_step(self, data):
        print("debug")
        if isinstance(data, tuple):
            ids, encoded_train_input, encoded_train_output = data[0]
        else:
            ids, encoded_train_input, encoded_train_output = data
        print("debug0")
        #if random.random() > self.teacher_prop:
        #    goal_outputs = self.get_random_challenge_output(ids)
        #else:
        #  goal_outputs = encoded_train_output
        goal_outputs = encoded_train_output
        print("debug1")

        with tf.GradientTape() as tape:
            solution = self.proposition_model((encoded_train_input, encoded_train_output))

            #solution = self.sampling_layer((z_mean, z_log_var))
            solved_encoded_train_output = self.solver_model((solution, encoded_train_input))


            reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, goal_outputs)
            #kl_loss = -0.5 * (1 + z_log_var - tf.keras.backend.square(z_mean) - tf.keras.backend.exp(z_log_var))

            #kl_loss = tf.math.reduce_mean(tf.math.reduce_sum(kl_loss, axis=1))

            total_loss = reconstruction_loss #+ kl_loss
        print("debug2")
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        #self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        print("debug")
        if isinstance(data, tuple):
            ids, encoded_train_input, encoded_train_output = data[0]
        else:
            ids, encoded_train_input, encoded_train_output = data
        print("debug0")

        #if random.random() > self.teacher_prop:
        #    goal_outputs = self.get_random_challenge_output(ids)
        #else:
        #  goal_outputs = encoded_train_output
        goal_outputs = encoded_train_output

        solution = self.proposition_model((encoded_train_input, encoded_train_output), training=False)

        #solution = self.sampling_layer((z_mean, z_log_var))
        solved_encoded_train_output = self.solver_model((solution, encoded_train_input), training=False)


        reconstruction_loss = keras.losses.MeanSquaredError()(solved_encoded_train_output, goal_outputs)
        #kl_loss = -0.5 * (1 + z_log_var - tf.keras.backend.square(z_mean) - tf.keras.backend.exp(z_log_var))

        #kl_loss = tf.math.reduce_mean(tf.math.reduce_sum(kl_loss, axis=1))

        total_loss = reconstruction_loss #+ kl_loss

        self.total_loss_tracker.update_state(total_loss)
        self.solver_tracker.update_state(reconstruction_loss)
        #self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

In [None]:
def get_updater_model(input_channels, output_channels, nb_proposition_backtracking, kernel_size=(3, 3), layers_features=None):
    if layers_features is None:
        layers_features = [64, 128, 128, 128]

    distr_mean_input = layers.Input(shape=(30, 30, output_channels), name="distr_mean_input")
    distr_std_input = layers.Input(shape=(30, 30, output_channels), name="distr_std_input")
    y_mean = compute_block(distr_mean_input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y_std = compute_block(distr_std_input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    y_dist = tf.keras.layers.Concatenate(axis=-1)([y_mean, y_std])
    y_dist = compute_block(y_dist, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    y_sol = None
    train_inputs = []
    for i in range(nb_proposition_backtracking):
        train_input = layers.Input(shape=(30, 30, input_channels), name="solution_input_{}".format(i))
        train_inputs.append(train_input)
        y_tmp = compute_block(train_input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

        if y_sol is None:
            y_sol = y_tmp
        else:
            y_sol = tf.keras.layers.Concatenate(axis=-1)([y_sol, y_tmp])
    y_sol = compute_block(y_sol, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    y = tf.keras.layers.Concatenate(axis=-1)([y_dist, y_sol])
    yy = compute_block(y, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    #train_input = layers.Input(shape=(30, 30, 128), name="train_input")
    #train_output = layers.Input(shape=(30, 30, 128), name="train_output")

    #y_input = compute_block(train_input, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    #y_output = compute_block(train_output, layers_features[0], tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)
    #y = tf.keras.layers.Concatenate(axis=-1)([y_input, y_output])

    for i, nb_features in enumerate(layers_features[1:]):
        y = compute_block(y, nb_features, tf.keras.layers.LeakyReLU(alpha=0.2), kernel_size=kernel_size)

    # Final Bloc -> img
    mean = compute_block(y, output_channels, None, kernel_size=(3, 3))
    z_log_var  = compute_block(y, output_channels, None, kernel_size=(3, 3))

    proposition_model = keras.models.Model([distr_mean_input, distr_std_input, *train_inputs], [mean, z_log_var], name="proposition_model")
    return proposition_model

In [None]:
class PropositionalSolver(keras.Model):
    def __init__(self, proposition_model, solver_model, autoencoder, solution_updater_model):
        super(PropositionalSolver, self).__init__()

        self.solution_proposer_model = proposition_model
        self.solver_model = solver_model

        self.solution_updater_model = solution_updater_model
        self.autoencoder = autoencoder
        self.nb_proposition_backtracking = 5
        self.nb_research_iterations = 1
        self.memory_size = 20

        self.seed_generator = keras.random.SeedGenerator(1337)

    @tf.function
    def call(self, inputs):
        train_challenge_input, train_challenge_outputs, test_challenge_inputs = inputs

        if train_challenge_input.shape[0] != train_challenge_outputs.shape[0]:
            raise Exception("Training data input and output have different size")

        # ENCODING
        encoded_train_input = autoencoder.encoder(train_challenge_input)
        encoded_train_output = autoencoder.encoder(train_challenge_outputs)

        # SOLUTION DISTRIBUTION
        z_mean, z_log_var = self.solution_proposer_model((encoded_train_input, encoded_train_output))

        # Keep track of previous best solutions
        best_sol = np.zeros((self.memory_size, 30, 30, 128))
        best_losses_reducted = np.full((self.memory_size), 5**9)
        best_losses = np.zeros((self.memory_size, 30, 30, 128)) * 5**9
        for i in tqdm(range(self.nb_research_iterations)):
            #Sample solution
            epsilon = keras.random.normal(shape=z_mean.shape, seed=self.seed_generator)
            solution = z_mean + keras.ops.exp(0.5 * z_log_var) * epsilon

            # Apply solution
            solved_encoded_train_output = self.solver_model((solution, encoded_train_input))
            # Compute the loss on the solution
            solving_loss = tf.square(encoded_train_output - solved_encoded_train_output)
            reduced_loss =  tf.reduce_sum(solving_loss, axis=[1, 2, 3])

            # Concat the current solutions to best solutions to make a filter that keeps only the best
            best_sol = tf.concat([best_sol, solution], axis=0)
            best_losses_reducted = tf.concat([best_losses_reducted, reduced_loss], axis=0)
            best_losses = tf.concat([best_losses, solving_loss], axis=0)
            # Sort the solutions by the sum of the MSE
            sorted_indices = tf.argsort(best_losses_reducted, axis=0)
            best_sol = tf.gather(best_sol, sorted_indices)
            best_losses_reducted = tf.gather(best_losses_reducted, sorted_indices)
            best_losses = tf.gather(best_losses, sorted_indices)
            # Keep only the best 20 solutions
            best_sol = best_sol[:self.memory_size]
            best_losses_reducted = best_losses_reducted[:self.memory_size]
            best_losses = best_losses[:self.memory_size]

            # Select 5 random solutions and their loss from the 20 best solutions to serve as input to the update model
            selected_solutions_input = None
            for j in range(z_mean.shape[0]): # Iterate on batch dimension. Didn't have time to think of a better solution
              random_solutions_indices = tf.random.shuffle(tf.range(self.memory_size))[:self.nb_proposition_backtracking]
              selected_solutions = tf.gather(best_sol, random_solutions_indices)
              selected_losses = tf.gather(best_losses, random_solutions_indices)
              concatenated_sol_loss = tf.expand_dims(tf.concat([selected_solutions, selected_losses], axis=-1), axis=1)
              if selected_solutions_input is None:
                  selected_solutions_input = concatenated_sol_loss
              else:
                  selected_solutions_input = tf.concat([selected_solutions_input, concatenated_sol_loss], axis=1)
            # Final selected solutions
            selected_solutions_input = tuple(tf.unstack(selected_solutions_input))

            # Update distribution based on previous distribution but also on memory
            z_mean, z_log_var = update_model((z_mean, z_log_var, *selected_solutions_input))

            #kl_loss = -0.5 * (1 + z_log_var - tf.ops.square(z_mean) - tf.ops.exp(z_log_var))
            #kl_loss = tf.ops.mean(ops.sum(kl_loss, axis=1))
            #total_loss = reconstruction_loss
        encoded_test_input = autoencoder.encoder(test_challenge_inputs)
        # Apply best solution
        chosen_solution = tf.repeat(tf.expand_dims(best_sol[0], axis=0), encoded_test_input.shape[0], axis=0)
        solved_encoded_test_output = self.solver_model((chosen_solution, encoded_test_input))
        solved_output = autoencoder.decoder(solved_encoded_test_output)


        return solved_output


In [None]:

update_model = get_updater_model(solution_latent_size * 2, solution_latent_size, nb_proposition_backtracking=5)

prop_solver = PropositionalSolver(solutioner, solver, autoencoder, update_model)

In [None]:
prop_solver((np.array(train_solver_inputs[0]),
             np.array(train_solver_outputs[0]),
             np.array(test_solver_inputs[0])
             ))

In [None]:
len(train_solver_inputs), train_solver_inputs[0][0].shape

In [None]:
import math
optimizer = keras.optimizers.Adam(learning_rate=5e-3)

# TODO

def fit_encoder(inputs, outputs, ids, batch_size, epochs):
    training_data = list(zip(inputs, outputs, ids))
    print(len(training_data))
    nb_iterations = math.ceil(len(inputs) / batch_size)
    indices = list(range(len(inputs)))
    for epoch in range(epochs):
        random.shuffle(training_data)

        batch_indices = np.array_split(indices, nb_iterations)
        for iter in range(nb_iterations):
          i, j = batch_indices[iter][0], batch_indices[iter][-1] + 1

          x, y, curr_id = zip(*(training_data[i:j]))

          with tf.GradientTape() as tape:
            loss = 0
            cce = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
            for curr_x in x:
              curr_x = tf.expand_dims(curr_x, axis=0)
              #curr_y = tf.expand_dims(y[i], axis=0)

              pred_x = autoencoder(curr_x)

              loss += cce(curr_x, pred_x)
          gradient = tape.gradient(loss, autoencoder.trainable_variables)
          optimizer.apply_gradients(
              zip(gradient, autoencoder.trainable_variables)
          )




