In [None]:
class Config: 
    """
    In this competition the matrix size is from 2x2 to 30x30
    and maximum 5-6 examples of solutions 
    """
    
    
    train = {'inputs': '/kaggle/input/arc-prize-2024/arc-agi_training_challenges.json',
             'outputs': '/kaggle/input/arc-prize-2024/arc-agi_training_solutions.json'}
    validation = {'inputs': '/kaggle/input/arc-prize-2024/arc-agi_evaluation_challenges.json', 
                  'outputs': '/kaggle/input/arc-prize-2024/arc-agi_evaluation_solutions.json'}
    test = {'inputs': '/kaggle/input/arc-prize-2024/arc-agi_test_challenges.json'}
    
    max_size = (30, 30)
    max_channels = 10
    num_epoch = 150
    batch_size = 16
    buffer_size = 8
    early_stopping_patience = 80 
    
cfg = Config() 

In [None]:
import numpy as np 
import json 

from tensorflow.data import Dataset 
import tensorflow as tf 

from tensorflow.keras import layers 
import keras

# Data Preparation 

In [None]:
import cv2 

def size_to_30(i, key = 'input', target_size = None, for_tf = True):
    if type(i) == dict: 
        arr = np.array(i[key])
    else: 
        arr = np.array(i)
        
    if target_size == None: 
        target_size = cfg.max_size
    # Convert arr to cv2 image (assuming arr is in a format compatible with OpenCV)
    img = arr.astype(np.uint8)
    # Resize the image to 30x30 using cubic interpolation
    img_resized = cv2.resize(img, target_size, interpolation=cv2.INTER_NEAREST)
    # Convert the resized image back to a NumPy array
    if for_tf: 
        return tf.convert_to_tensor(img_resized)
    else:
        return img_resized


def prepare_inputs(path, only_one = True): 
    input_data = {
        'test_inputs': list(), 
        'train_inputs': list(), 
        'train_outputs': list()}  
    
    with open(path, mode='r') as file: 
        data = json.load(file)
        file.close() 
    
    for v in data.values(): 
        if only_one: 
            input_data['test_inputs'].append([size_to_30(i) for i in v['test'][:1]])
            input_data['train_inputs'].append([size_to_30(i) for i in v['train'][:1]])
            input_data['train_outputs'].append([size_to_30(i, 'output') for i in v['train'][:1]])
        else: 
            test_inputs = [size_to_30(i) for i in v['test']]
            input_data['test_inputs'].extend([[i] for i in test_inputs])
            
            num_test_inputs = len(test_inputs)
            
            train_inputs = [size_to_30(i) for i in v['train'][:1]]
            train_outputs = [size_to_30(i, 'output') for i in v['train'][:1]]
            for i in range(num_test_inputs): 
                input_data['train_inputs'].append(train_inputs)
                input_data['train_outputs'].append(train_outputs)
        
    return input_data 
    
def prepare_outputs(path, only_one = True): 
    output_data = list()
    
    with open(path, mode='r') as file: 
        data = json.load(file)
        file.close() 
        
    for v in data.values(): 
        if only_one: 
            output_data.append([size_to_30(i) for i in v[:1]])
        else: 
            output_data.extend([[size_to_30(i)] for i in v])
            
    return output_data

def prepare1(data_path_item, only_one = True):
    inputs = None 
    outputs = None 
    
    
    if 'inputs' in data_path_item.keys(): 
        inputs = data_path_item['inputs']
        
        inputs = prepare_inputs(inputs, only_one)
        
    if 'outputs' in data_path_item.keys(): 
        outputs = data_path_item["outputs"]
        
        outputs = prepare_outputs(outputs, only_one)
        
    return inputs, outputs 

X_train, y_train = prepare1(cfg.train, only_one = False)
X_validation, y_validation = prepare1(cfg.validation, only_one = False )
# test = prepare1(cfg.test)

# To Dataset 

In [None]:
def add_channels(item): 
    """
    item is 30x30 matrix which is tensor with shape [1, 30, 30]
    
    This function returns this tensors with shape [1, 30, 30, 9]
    """
    
    num_classes = cfg.max_channels
    
    item_with_channels = tf.one_hot(item, num_classes)
    
    return item_with_channels 

def build_dataset(X, y = None, batch = True, shuffle = True): 
    ds_input = (
        Dataset.from_tensor_slices({**X})
      .map(lambda i: {k: tf.squeeze(v) for k, v in i.items()})
      .map(lambda i: {k: add_channels(v) for k, v in i.items()}))
    
    if y != None: 
        ds_output = (Dataset.from_tensor_slices(y)
          .map(lambda i: tf.squeeze(i))
          .map(lambda i: add_channels(i))
                    )
    
        ds = Dataset.zip((ds_input, ds_output))
    
    else: 
        ds = ds_input
    
    if shuffle == True: 
        ds = ds.shuffle(cfg.buffer_size)
    
    if batch == True: 
        ds = ds.batch(cfg.batch_size)
            
    return ds.prefetch(cfg.buffer_size)
    
ds_train = build_dataset(X_train, y_train)
ds_validation = build_dataset(X_validation, y_validation)

# Model

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

# Define the convolutional block with Batch Normalization, Layer Normalization, and specific initializers
def conv_block(item):
    # Define the initializers for Conv2D
    kernel_initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.2)
    bias_initializer = tf.keras.initializers.Zeros()
    
    # Apply Conv2D with specified initializers
    item_conved = layers.Conv2D(
        filters=64,  # Example filter size, adjust as needed
        kernel_size=(3, 3),  # Example kernel size, adjust as needed
        padding='same',
        kernel_initializer=kernel_initializer,
        bias_initializer=bias_initializer
    )(item)
    
#     item_conved = layers.MaxPooling2D(pool_size=(2, 2))(item_conved)
    
    # Apply Batch Normalization with specific initializers
    item_conved = layers.BatchNormalization(
        beta_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.2),
        gamma_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.2)
    )(item_conved)
    
    # Apply Layer Normalization with specific initializers
    item_conved = layers.LayerNormalization(
        beta_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.2),
        gamma_initializer=tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.2)
    )(item_conved)
    
    # Apply Dropout
    item_conved = layers.Dropout(rate=0.2)(item_conved)
    
    return item_conved

def build_model(conv_cycling_train=2, conv_cycling_test=2):
    max_channels = cfg.max_channels
    # Take all input tensors
    train_inputs = layers.Input(shape=[*cfg.max_size, max_channels], name='train_inputs')
    train_outputs = layers.Input(shape=[*cfg.max_size, max_channels], name='train_outputs')
    test_inputs = layers.Input(shape=[*cfg.max_size, max_channels], name='test_inputs')
    
    train_inputs_conved = None
    train_outputs_conved = None 
    for i in range(conv_cycling_train):
        train_inputs_conved = conv_block(train_inputs)
        train_outputs_conved = conv_block(train_outputs)
        
    test_inputs_conved = None 
    for i in range(conv_cycling_test):
        test_inputs_conved = conv_block(test_inputs)
    
    # Train data
    train_ = layers.Add()([train_inputs_conved , train_outputs_conved ])
    train_ = layers.Flatten()(train_)
    train_ = layers.Dense(units=1024, activation='gelu')(train_)
    
    # Test data
    test_ = layers.Flatten()(test_inputs_conved )
    test_ = layers.Dense(units=1024, activation='gelu')(test_)
    
    # Merge train and test data
    merged = layers.Add()([train_, test_])
    merged = layers.Dense(units=1024, activation='gelu')(merged)
    
    # Final Dense layer to get 30x30 output
    output = layers.Dense(units=cfg.max_size[0]*cfg.max_size[1]*cfg.max_channels, activation='softmax')(merged)
    output = layers.Reshape([*cfg.max_size, max_channels])(output)
    
    model = tf.keras.models.Model(inputs=[train_inputs, train_outputs, test_inputs], outputs=output)
    
    return model

main_model = build_model()
main_model.summary()

# Train

In [None]:
class CustomLoss(tf.keras.losses.Loss):
    def __init__(self, name="custom_categorical_loss"):
        super().__init__(name=name)
        self.criterion = tf.keras.losses.CategoricalCrossentropy()

    def call(self, y_true, y_pred):
        # Ensure y_pred is in logits form if softmax is applied during the last layer of the model
        y_true_flatten = layers.Flatten()(y_true)
        y_pred_flatten = layers.Flatten()(y_pred)
        
        # Compute categorical crossentropy
        loss = self.criterion(y_true, y_pred)
        

        return loss

In [None]:
main_model.compile(optimizer='adam', 
              loss=CustomLoss(),
              metrics=['accuracy'])

In [None]:
def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * 0.7

scheduler = keras.callbacks.LearningRateScheduler(scheduler) 
early_stopping = keras.callbacks.EarlyStopping(monitor = 'val_loss', 
                                               min_delta = 1e-4, 
                                               patience = cfg.early_stopping_patience, 
                                               restore_best_weights = True)

main_model.fit(ds_train, 
               epochs = cfg.num_epoch, 
               validation_data = ds_validation, 
               callbacks = [
                   scheduler, 
                   early_stopping,
               ]
              )

# Make Submission

In [None]:
X_test, _ = prepare1(cfg.test, only_one = False)
ds_test = build_dataset(X_test, shuffle = False )

In [None]:
predictions = list()

for batch in ds_test: 
    prediction = main_model(batch)
    predictions.extend([i for i in prediction.numpy()])

In [None]:
actual_ids = list() 
actual_shapes = list()

with open(cfg.test['inputs'], mode='r') as file: 
    data = json.load(file)
    file.close() 

for k, v in data.items(): 
    test_input_num = len(v['test'])
    
    train_output_shape = [np.array(i['output']).shape for i in v['train'][:1]]
    train_input_shape = [np.array(i['input']).shape for i in v['train'][:1]]
    test_input_shape = [np.array(i['input']).shape for i in v['test'][:1]]
    
    target_shape = [(int(test_input_shape_[0] * (train_output_shape_[0] / train_input_shape_[0]))
                     , int(test_input_shape_[1] * (train_output_shape_[1] / train_input_shape_[1])))
                    for test_input_shape_, train_output_shape_, train_input_shape_ 
                        in zip(test_input_shape, train_output_shape, train_input_shape)]

    
    for i in range(test_input_num): 
        actual_shapes.extend(target_shape)
        actual_ids.append(k)

In [None]:
predictions_reshaped = [np.argmax(p, axis = -1) for p in predictions]
predictions_reshaped = [cv2.resize(p.astype(np.uint8), t, interpolation=cv2.INTER_NEAREST) for p, t in zip(predictions_reshaped, actual_shapes)]

In [None]:
submission = dict() 
for i, p in zip(actual_ids, predictions_reshaped): 
    if i in submission.keys(): 
        ...
    else: 
        submission[i] = list() 
        submission[i].append(dict())
        
        submission[i][0][f'attempt_1'] = p.tolist() 
        submission[i][0][f'attempt_2'] = p.tolist()         

In [None]:
with open('submission.json', 'w') as file: 
    json.dump(obj = submission, fp = file)