In [96]:
import json
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim


In [97]:
training_challenge_dict = json.load(open('../data/RAW_DATA_DIR/arc-prize-2024/arc-agi_training_challenges.json'))
training_solutions_dict = json.load(open('../data/RAW_DATA_DIR/arc-prize-2024/arc-agi_training_solutions.json'))

In [98]:
def create_input_output_pairs(training_challenge_dict, training_solutions_dict):
    input_output_pairs = []
    
    for challenge_id, challenge in training_challenge_dict.items():
        train = challenge['train']
        test = challenge['test']

        # find how many examples there are in the train
        num_train_examples = len(train)

        for i in range(num_train_examples):
            input_output_pairs.append({
                'input': train[i]['input'],
                'output': train[i]['output'],
                'challenge_id': challenge_id,
            })

        test_input = test[0]['input']
        test_output = training_solutions_dict[challenge_id][0]
        input_output_pairs.append({
            'input': test_input,
            'output': test_output,
            'challenge_id': challenge_id,
        })
        
    return input_output_pairs

input_output_pairs = create_input_output_pairs(training_challenge_dict, training_solutions_dict)

In [99]:
def create_dataset(training_challenge_dict, training_solutions_dict):
    dict = {
        'input': [],
        'output': [],
        'challenge_id': []
    }

    for challenge_id, challenge in training_challenge_dict.items():
        train = challenge['train']
        test = challenge['test']

        for i in range(len(train)):
            dict['input'].append(torch.tensor(train[i]['input'], dtype=torch.float32))
            dict['output'].append(torch.tensor(train[i]['output'], dtype=torch.float32))
            dict['challenge_id'].append(challenge_id)

        dict['input'].append(torch.tensor(test[0]['input'], dtype=torch.float32))
        dict['output'].append(torch.tensor(training_solutions_dict[challenge_id][0], dtype=torch.float32))
        dict['challenge_id'].append(challenge_id)

    return pd.DataFrame(dict)

dataset = create_dataset(training_challenge_dict, training_solutions_dict)

In [100]:
dataset['output'][5].shape

torch.Size([9, 9])

In [101]:
inputs = dataset[:5]['input']
outputs = dataset[:5]['output']

# add a batch dimension to the inputs
inputs = [input.unsqueeze(0) for input in inputs]
outputs = [output.unsqueeze(0) for output in outputs]

In [102]:
# Define the CNN model
class CNN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(CNN, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        
        # Calculate the size of the feature maps after convolutions and pooling
        conv_output_size = (input_dim[0] // 8) * (input_dim[1] // 8)
        self.fc1 = nn.Linear(64 * conv_output_size, 128)
        self.fc2 = nn.Linear(128, output_dim[0] * output_dim[1])

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x.view(-1, self.output_dim[0], self.output_dim[1])

# Initialize the model, loss function, and optimizer
input_dim = (3, 3)  # Example input dimension
output_dim = (9, 9)   # Example output dimension
model = CNN(input_dim, output_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

# Training loop
num_epochs = 100

for epoch in range(num_epochs):
    for i in range(len(inputs)):
        input_tensor = inputs[i]
        output_tensor = outputs[i].unsqueeze(0)  # Add batch dimension
        
        # Forward pass
        print('imput tensor', input_tensor.shape)
        prediction = model(input_tensor)
        loss = criterion(prediction, output_tensor)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print("Training completed!")

# Function to make predictions
def predict(model, input_data):
    model.eval()
    with torch.no_grad():
        input_tensor = torch.tensor(input_data, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
        output = model(input_tensor)
    return output.squeeze().numpy()

# Example prediction
sample_input = dataset['input'].iloc[0]
predicted_output = predict(model, sample_input)
print("Sample Input:")
print(sample_input)
print("Predicted Output:")
print(predicted_output)



imput tensor torch.Size([1, 3, 3])




RuntimeError: Given input size: (64x1x1). Calculated output size: (64x0x0). Output size is too small

In [None]:
# group by challenge_id
dataset.groupby('challenge_id')

#

<pandas.core.groupby.generic.DataFrameGroupBy object at 0x11cca7d70>