In [1]:
import json
import os

import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [2]:
data_path = os.path.join('..', 'data', 'training', '0a938d79.json')

with open(data_path, 'r') as f:
  data = json.load(f)

In [3]:
def pad_data_to_30x30(data):
    # Convert input to numpy array if it's not already
    data = np.array(data)
    
    # Initialize a 30x30 array filled with -1
    padded_data = np.full((30, 30), -1, dtype=int)
    
    # Copy data to padded_data
    rows, cols = data.shape
    padded_data[:rows, :cols] = data[:30, :30]
    
    return padded_data

def visualize_grid(data):
    if not isinstance(data, np.ndarray):
        data = np.array(data)
    
    if data.ndim != 2:
        raise ValueError("Input must be a 2D array")
    
    fig, ax = plt.subplots(figsize=(10, 10))
    fig.suptitle("Visualization of 2D Padded Data", fontsize=16)
    
    # Create a custom colormap
    cmap = plt.cm.get_cmap('tab10')
    colors = [cmap(i) for i in range(10)]
    custom_cmap = plt.cm.colors.ListedColormap(['lightgrey'] + colors + ['darkred'])
    
    # Create bounds and norm for the colormap
    bounds = list(range(-2, 11))  # This creates [-2, -1, 0, 1, ..., 10]
    norm = plt.cm.colors.BoundaryNorm(bounds, custom_cmap.N)
    
    im = ax.imshow(data, cmap=custom_cmap, norm=norm, interpolation='nearest')
    ax.set_title("Padded Grid")
    
    # Add grid lines
    ax.set_xticks(np.arange(-0.5, 30, 1), minor=True)
    ax.set_yticks(np.arange(-0.5, 30, 1), minor=True)
    ax.grid(which="minor", color="black", linestyle='-', linewidth=0.5)
    
    # Remove axis ticks
    ax.set_xticks([])
    ax.set_yticks([])
    
    # Add a colorbar
    cbar = fig.colorbar(im, ax=ax, shrink=0.8)
    cbar.set_ticks(range(-1, 11))
    cbar.set_ticklabels(['-1 (pad)'] + list(range(10)) + ['10+'])
    
    plt.tight_layout()
    plt.show()


def encode_grid(grid):
    """Convert a 2D grid to a 11-channel 3D tensor."""
    encoded = np.zeros((11, 30, 30), dtype=np.float32)
    for i in range(30):
        for j in range(30):
            value = grid[i, j]
            if -1 <= value <= 9:
                encoded[value + 1, i, j] = 1
    return encoded

In [4]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(channels)
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += residual
        return self.relu(out)

class EnhancedMultiChannelCNN(nn.Module):
    def __init__(self, num_residual_blocks=3):
        super(EnhancedMultiChannelCNN, self).__init__()
        self.conv1 = nn.Conv2d(11, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        
        self.residual_blocks = nn.Sequential(
            *[ResidualBlock(64) for _ in range(num_residual_blocks)]
        )
        
        self.conv_final = nn.Conv2d(64, 11, kernel_size=3, padding=1)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.residual_blocks(x)
        x = self.conv_final(x)
        return x

In [39]:
X = torch.tensor([encode_grid(pad_data_to_30x30(ex['input'])) for ex in data['train']])
Y = torch.tensor([encode_grid(pad_data_to_30x30(ex['output'])) for ex in data['train']])

In [40]:
dataset = TensorDataset(X, Y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [41]:
model = EnhancedMultiChannelCNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [43]:
num_epochs = 30000
for epoch in range(num_epochs):
    for inputs, targets in dataloader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.argmax(dim=1))
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 1000 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

Epoch [1000/30000], Loss: 0.0006
Epoch [2000/30000], Loss: 0.0005
Epoch [3000/30000], Loss: 0.0005
Epoch [4000/30000], Loss: 0.0005
Epoch [5000/30000], Loss: 0.0006
Epoch [6000/30000], Loss: 0.0005
Epoch [7000/30000], Loss: 0.0005
Epoch [8000/30000], Loss: 0.0005
Epoch [9000/30000], Loss: 0.0006
Epoch [10000/30000], Loss: 0.0005
Epoch [11000/30000], Loss: 0.0005
Epoch [12000/30000], Loss: 0.0005
Epoch [13000/30000], Loss: 0.0005
Epoch [14000/30000], Loss: 0.0005
Epoch [15000/30000], Loss: 0.0005
Epoch [16000/30000], Loss: 0.0007
Epoch [17000/30000], Loss: 0.0005
Epoch [18000/30000], Loss: 0.0005
Epoch [19000/30000], Loss: 0.0005
Epoch [20000/30000], Loss: 0.0005
Epoch [21000/30000], Loss: 0.0005
Epoch [22000/30000], Loss: 0.0005
Epoch [23000/30000], Loss: 0.0005
Epoch [24000/30000], Loss: 0.0005
Epoch [25000/30000], Loss: 0.0005
Epoch [26000/30000], Loss: 0.0005
Epoch [27000/30000], Loss: 0.0005
Epoch [28000/30000], Loss: 0.0005
Epoch [29000/30000], Loss: 0.0005
Epoch [30000/30000], Lo

In [11]:
# Save the trained model
torch.save(model.state_dict(), 'linear_static_dictionary.pth')

In [29]:
def predict(input_data):
    model.eval()
    with torch.no_grad():
        input_tensor = torch.tensor(encode_grid(pad_data_to_30x30(input_data))).unsqueeze(0)
        output = model(input_tensor)
        return output.argmax(dim=1).squeeze().numpy() - 1  # Convert back to [-1, 9] range
    
# Function to visualize prediction
def visualize_prediction(input_data):
    predicted_output = predict(input_data)
    visualize_grid(predicted_output)

def visualize_actual(data):
    visualize_grid(pad_data_to_30x30(data['output']))
    visualize_grid(pad_data_to_30x30(data['input']))


In [49]:
x_test = data['train'][2]
visualize_actual(x_test)
visualize_prediction(x_test['input'])
