In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
import json
import matplotlib.pyplot as plt

# Load ARC data
def load_arc_data(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    return data

train_data = load_arc_data('../data/training/007bbfb7.json')
test_data = load_arc_data('../data/evaluation/00576224.json')

# Prepare inputs and outputs
def prepare_data(data):
    inputs = []
    outputs = []
    for task in data:
        for train in task['train']:
            inputs.append(np.array(train['input']))
            outputs.append(np.array(train['output']))
    return inputs, outputs

train_inputs, train_outputs = prepare_data(train_data)
test_inputs, test_outputs = prepare_data(test_data)

# Convert to tensors
train_inputs = [torch.tensor(arr, dtype=torch.float32) for arr in train_inputs]
train_outputs = [torch.tensor(arr, dtype=torch.float32) for arr in train_outputs]
test_inputs = [torch.tensor(arr, dtype=torch.float32) for arr in test_inputs]
test_outputs = [torch.tensor(arr, dtype=torch.float32) for arr in test_outputs]

# Custom Dataset
class ARCDataset(Dataset):
    def __init__(self, inputs, outputs, transform=None):
        self.inputs = inputs
        self.outputs = outputs
        self.transform = transform

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        input_image = self.inputs[idx]
        output_image = self.outputs[idx]
        
        if self.transform:
            if not isinstance(input_image, torch.Tensor):
                input_image = self.transform(input_image)
            if not isinstance(output_image, torch.Tensor):
                output_image = self.transform(output_image)
        
        return input_image, output_image

# Define transforms
transform = transforms.Compose([
    transforms.Lambda(lambda x: x.unsqueeze(0) if x.ndim == 2 else x),  # Add channel dim if needed
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.size(0) == 1 else x),  # Convert to 3 channels if grayscale
    transforms.Resize((32, 32)),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Create datasets and dataloaders
train_dataset = ARCDataset(train_inputs, train_outputs, transform=transform)
test_dataset = ARCDataset(test_inputs, test_outputs, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define the model
class ARCModel(nn.Module):
    def __init__(self):
        super(ARCModel, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 3, kernel_size=3, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# Initialize model, loss, and optimizer
model = ARCModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters())

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Evaluation
model.eval()
test_loss = 0.0
with torch.no_grad():
    for inputs, targets in test_loader:
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_loss += loss.item()

print(f'Test Loss: {test_loss/len(test_loader):.4f}')

# Visualize results
def visualize_results(model, dataset, num_samples=5):
    fig, axs = plt.subplots(num_samples, 3, figsize=(15, 5*num_samples))
    for i in range(num_samples):
        input_image, target_image = dataset[i]
        input_image = input_image.unsqueeze(0)
        output_image = model(input_image)
        
        axs[i, 0].imshow(input_image.squeeze().permute(1, 2, 0).numpy())
        axs[i, 0].set_title('Input')
        axs[i, 1].imshow(target_image.squeeze().permute(1, 2, 0).numpy())
        axs[i, 1].set_title('Target')
        axs[i, 2].imshow(output_image.squeeze().permute(1, 2, 0).detach().numpy())
        axs[i, 2].set_title('Output')
    
    plt.tight_layout()
    plt.show()

visualize_results(model, test_dataset)


FileNotFoundError: [Errno 2] No such file or directory: 'training.json'