In [None]:
# from google.colab import drive
# drive.mount('/content/drive')
# %pip install tqdm

In [None]:
import os
import json
import numpy as np
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm
import matplotlib.pyplot as plt
from matplotlib import colors
from matplotlib import animation, rc
from IPython.display import HTML

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
data_path = Path('/Users/karankamath/Desktop/ARC-master/data')
train_path = data_path / 'training'
eval_path = data_path / 'evaluation'
test_path = data_path / 'test'

train_tasks = { task.stem: json.load(task.open()) for task in train_path.iterdir() }
eval_tasks = { task.stem: json.load(task.open()) for task in eval_path.iterdir() }

In [None]:
cmap = colors.ListedColormap(
        ['#000000', '#0074D9','#FF4136','#2ECC40','#FFDC00',
         '#AAAAAA', '#F012BE', '#FF851B', '#7FDBFF', '#870C25'])
norm = colors.Normalize(vmin=0, vmax=9)

def plot_pictures(pictures, labels):
    # print(len(pictures))
    fig, axs = plt.subplots(1, len(pictures), figsize=(2*len(pictures),32))
    for i, (pict, label) in enumerate(zip(pictures, labels)):
        axs[i].imshow(np.array(pict), cmap=cmap, norm=norm)
        axs[i].set_title(label)
    plt.show()

def plot_sample(sample, predict=None):
    if predict is None:
        plot_pictures([sample['input'], sample['output']], ['Input', 'Output'])
    elif 'output' in sample:
        plot_pictures([sample['input'], sample['output'], predict], ['Input', 'Output', 'Predict'])
    else:
        plot_pictures([sample['input'], predict], ['Input', 'Predict'])

def inp2img(inp):
    inp = np.array(inp)
    img = np.full((10, inp.shape[0], inp.shape[1]), 0, dtype=np.uint8)
    for i in range(10):
        img[i] = (inp==i)
    return img

def input_output_shape_is_same(task):
    return all([np.array(el['input']).shape == np.array(el['output']).shape for el in task['train']])


def calk_score(task_test, predict):
    return [int(np.equal(sample['output'], pred).all()) for sample, pred in zip(task_test, predict)]

## Checking for 5 tasks

In [None]:
task = train_tasks["db3e9e38"]["train"]
for sample in task:
    plot_sample(sample)

In [None]:
task = train_tasks["7ddcd7ec"]["train"]
for sample in task:
    plot_sample(sample)

## The Model

The model consists of a single 3x3 convolutional layer, followed by a 1x1 convolutional layer, just like my last notebook. Here `num_states` represents how many values a single cell could have; in this case 10, one for each color. Down the road, we may want to add a hidden state, concatinating it to the input, then removing it from the output.

The foward pass of the model will repeatedly pass the grid state through the CA transition for `steps` number of times.

In [None]:
class CAModel(nn.Module):
    def __init__(self, num_states):
        super(CAModel, self).__init__()
        self.transition = nn.Sequential(
            nn.Conv2d(num_states, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, num_states, kernel_size=1)
        )

    def forward(self, x, steps=1):
        for _ in range(steps):
            x = self.transition(torch.softmax(x, dim=1))
        return x

## Training

This "recurrent CNN" can be quite to difficult to train. After trying a few ideas, this seemed to be the best approach that I encountered:

* For every value $n$ = $1, ..., N$:
    1. Train the model with $n$ `steps` to produce the output from input
    2. Train the model with 1 `steps` to produce output from output
        * This enforces that the CA stabilizes after reaching a solution
        
In this way the model will try to get as close to a solution as possible in 1 step, then try to get closer in the next step, and so on until $N$ steps. For now I will use $N = 10$ = `max_steps`. I will also set the learning rate to decay with each additional step: $LR = 0.1 / (n * 2) $

In [None]:
import math
import random

def solve_task(task, max_steps=12):
    model = CAModel(10).to(device)
    num_epochs = 100
    num_repeat = 10
    criterion = nn.CrossEntropyLoss()
    losses = np.zeros((num_repeat - 1) * num_epochs)

    for i in range(1, num_repeat):
        if i > num_repeat-4:
            num_steps = max_steps
        else:
            num_steps = random.randint(1, max_steps)
        optimizer = torch.optim.Adam(model.parameters(), lr=(0.04 / i))

        for e in range(num_epochs):
            optimizer.zero_grad()
            loss = 0.0

            for sample in task:
                # predict output from input
                x = torch.from_numpy(inp2img(sample["input"])).unsqueeze(0).float().to(device)
                y = torch.tensor(sample["output"]).long().unsqueeze(0).to(device)
                y_pred = model(x, num_steps)
                loss += criterion(y_pred, y)

                # predit output from output
                # enforces stability after solution is reached
                y_in = torch.from_numpy(inp2img(sample["output"])).unsqueeze(0).float().to(device)
                y_pred = model(y_in, 1)
                loss += criterion(y_pred, y)

            loss.backward()
            optimizer.step()
            losses[(i - 1) * num_epochs + e] = loss.item()
    return model, num_steps, losses

@torch.no_grad()
def predict(model, task):
    predictions = []
    for sample in task:
        x = torch.from_numpy(inp2img(sample["input"])).unsqueeze(0).float().to(device)
        pred = model(x, 200).argmax(1).squeeze().cpu().numpy()
        predictions.append(pred)
    return predictions

task = train_tasks["dbc1a6ce"]["train"]
model, num_steps, losses = solve_task(task)

$n$ is incremented every 100 epochs, so we can see that it reaches a good solution after 3 steps (epoch 300).

In [None]:
plt.plot(losses)

It works! Now lets see if it generalized to the test question:

## More Tasks

Now that we know we can train a CA for one task, will it work on others?

In [None]:
def evaluate(tasks):
    result = []
    predictions = []
    for idx, task in tqdm(tasks.items()):
        if input_output_shape_is_same(task):
            model, _, _ = solve_task(task["train"])
            pred = predict(model, task["test"])
            score = calk_score(task["test"], pred)
        else:
            pred = [el["input"] for el in task["test"]]
            score = [0] * len(task["test"])

        predictions.append(pred)
        result.append(score)
    return result, predictions

In [None]:
# train_result, train_predictions = evaluate(train_tasks)
# train_solved = [any(score) for score in train_result]

# total = sum([len(score) for score in train_result])
# print(f"solved : {sum(train_solved)} from {total} ({sum(train_solved)/total})")

In [None]:
train_result1, train_predictions1 = evaluate(eval_tasks)
train_solved1 = [any(score) for score in train_result1]

total1 = sum([len(score) for score in train_result1])
print(f"solved : {sum(train_solved1)} from {total1} ({sum(train_solved1)/total1})")

In [None]:
def count_true(boolean_list):
    count = 0
    for element in boolean_list:
        if element:
            count += 1
    return count

count_true(train_solved1)
print(count_true(train_solved1))

## Solved Tasks

In [None]:
def count_true(boolean_list):
    count = 0
    for element in boolean_list:
        if element:
            count += 1
    return count

count_true(train_solved1)

In [None]:
def get_accuracy(oup, pred):
    r, c = len(oup), len(oup[0])
    correct = 0
    total = 0
    for i in range(r):
        for j in range(c):
            total += 1
            if oup[i][j] == pred[i][j]:
                correct += 1
    return correct / total


In [None]:
global model_failure
global model_success
global total_accuracy
global accuracy100
global accuracy95
global accuracy90
global accuracy80a
global accuracy80b

total_accuracy = 0
model_failure = 0
model_success = 0
accuracy100 = 0
accuracy95 = 0
accuracy90 = 0
accuracy80a = 0
accuracy80b = 0

for task, prediction, solved in tqdm(zip(eval_tasks.values(), train_predictions1, train_solved1)):
    # if solved:
    for i in range(len(task['test'])):
        # We have considered a task oly if the dimensions of output and prediction are same
        if (len(prediction[i]) == len(task['test'][i]['output'])) and (len(prediction[i][0]) == len(task['test'][i]['output'][0])):
            acc = get_accuracy(np.array(task['test'][i]['output']), np.array(prediction[i]))
            print("Accuracy: ", acc)
            if (acc == 1.0):
                accuracy100 += 1
            elif (acc >= 0.95):
                accuracy95 += 1
            elif (acc >= 0.9):
                accuracy90 += 1
            elif (acc >= 0.8):
                accuracy80a += 1
            else:
                accuracy80b += 1
            
            total_accuracy += acc
            plot_sample(task['test'][i], prediction[i])
            model_success += 1
        else:
            model_failure += 1

In [None]:
print("Total Accuracy: ", total_accuracy / model_success)
print("Accuracy 100: ", accuracy100)
print("Accuracy 95 - 100: ", accuracy95)
print("Accuracy 90 - 95: ", accuracy90)
print("Accuracy 80 - 90: ", accuracy80a)
print("Accuracy < 80: ", accuracy80b)
print("Model runs successfully for ", model_success, "tasks")
print("Model fails for ", model_failure, "tasks")