In [1]:
# Import libraries
import numpy as np
import pandas as pd
from pathlib import Path
import pickle
from collections import defaultdict

In [2]:
# Extract connected components by color
def connected_components_by_color(grid: np.ndarray):

    H, W = grid.shape
    labels = np.zeros((H, W), dtype=int)
    comp_info = {}
    current_label = 0

    nbrs = [(1,0), (-1,0), (0,1), (0,-1)]

    for r in range(H):
        for c in range(W):
            if labels[r, c] != 0:
                continue

            color = grid[r, c]
            current_label += 1
            cid = current_label
            labels[r, c] = cid
            stack = [(r, c)]
            pixels = [(r, c)]

            while stack:
                rr, cc = stack.pop()
                for dr, dc in nbrs:
                    nr, nc = rr + dr, cc + dc
                    if 0 <= nr < H and 0 <= nc < W:
                        if labels[nr, nc] == 0 and grid[nr, nc] == color:
                            labels[nr, nc] = cid
                            stack.append((nr, nc))
                            pixels.append((nr, nc))

            comp_info[cid] = {
                "color": int(color),
                "pixels": pixels,
            }

    return labels, comp_info

In [3]:
# Build component adjacency graph
def component_adjacency_graph(labels: np.ndarray):

    H, W = labels.shape
    adj = defaultdict(set)

    for r in range(H):
        for c in range(W):
            lab = labels[r, c]
            if lab == 0:
                continue

            if c + 1 < W:
                lab2 = labels[r, c + 1]
                if lab2 != 0 and lab2 != lab:
                    adj[lab].add(lab2)
                    adj[lab2].add(lab)

            if r + 1 < H:
                lab2 = labels[r + 1, c]
                if lab2 != 0 and lab2 != lab:
                    adj[lab].add(lab2)
                    adj[lab2].add(lab)

    return adj

In [4]:
# Compute geometric features for component
def component_geometric_features(comp_id, info):

    pixels = info["pixels"]
    color = info["color"]

    rows = np.array([p[0] for p in pixels])
    cols = np.array([p[1] for p in pixels])

    area = len(pixels)
    min_r, max_r = rows.min(), rows.max()
    min_c, max_c = cols.min(), cols.max()
    height = max_r - min_r + 1
    width  = max_c - min_c + 1
    aspect = height / width if width != 0 else 0.0

    centroid_r = rows.mean()
    centroid_c = cols.mean()

    perim = 0
    S = set(pixels)
    for r, c in pixels:
        for dr, dc in [(1,0),(-1,0),(0,1),(0,-1)]:
            if (r+dr, c+dc) not in S:
                perim += 1

    return {
        "component_id": comp_id,
        "color": color,
        "area": area,
        "bbox_height": height,
        "bbox_width": width,
        "bbox_aspect": float(aspect),
        "centroid_r": float(centroid_r),
        "centroid_c": float(centroid_c),
        "perimeter": perim,
    }

In [5]:
# Count holes in component
def count_holes_for_component(labels: np.ndarray, comp_id: int):

    coords = np.argwhere(labels == comp_id)
    if coords.size == 0:
        return 0

    min_r, min_c = coords.min(axis=0)
    max_r, max_c = coords.max(axis=0)
    sub = labels[min_r:max_r+1, min_c:max_c+1]

    bg = (sub == 0).astype(int)
    H, W = bg.shape
    visited = np.zeros_like(bg, dtype=bool)
    holes = 0

    def dfs(sr, sc):
        stack = [(sr, sc)]
        visited[sr, sc] = True
        touches_border = (sr == 0 or sc == 0 or sr == H-1 or sc == W-1)

        while stack:
            r, c = stack.pop()
            for dr, dc in [(1,0),(-1,0),(0,1),(0,-1)]:
                nr, nc = r + dr, c + dc
                if 0 <= nr < H and 0 <= nc < W:
                    if bg[nr, nc] == 1 and not visited[nr, nc]:
                        visited[nr, nc] = True
                        stack.append((nr, nc))
                        if nr == 0 or nc == 0 or nr == H-1 or nc == W-1:
                            touches_border = True
        return touches_border

    for r in range(H):
        for c in range(W):
            if bg[r, c] == 1 and not visited[r, c]:
                touches_border = dfs(r, c)
                if not touches_border:
                    holes += 1

    return holes

In [6]:
# Compute topology features for component
def component_topology_features(labels: np.ndarray, comp_id: int):

    holes = count_holes_for_component(labels, comp_id)
    beta0 = 1
    beta1 = holes
    euler = beta0 - beta1

    return {
        "component_id": comp_id,
        "beta0": beta0,
        "beta1": beta1,
        "num_holes": holes,
        "euler_characteristic": euler,
    }

In [7]:
# Compute graph-level descriptors
def graph_descriptors(adj):

    if not adj:
        return {
            "graph_num_nodes": 0,
            "graph_num_edges": 0,
            "graph_mean_degree": 0.0,
            "graph_max_degree": 0,
            "graph_num_connected_components": 0,
        }

    degrees = [len(adj[n]) for n in adj]
    num_nodes = len(adj)
    num_edges = sum(degrees) // 2

    visited = set()
    cc = 0
    for n in adj:
        if n in visited:
            continue
        cc += 1
        stack = [n]
        visited.add(n)
        while stack:
            u = stack.pop()
            for v in adj[u]:
                if v not in visited:
                    visited.add(v)
                    stack.append(v)

    return {
        "graph_num_nodes": num_nodes,
        "graph_num_edges": num_edges,
        "graph_mean_degree": float(np.mean(degrees)),
        "graph_max_degree": int(np.max(degrees)),
        "graph_num_connected_components": cc,
    }

In [8]:
# Extract all features for a single grid
def extract_features_for_grid(grid: np.ndarray, task_id: str, role: str, index: int):

    labels, comp_info = connected_components_by_color(grid)
    adj = component_adjacency_graph(labels)
    graph_feats = graph_descriptors(adj)

    rows = []
    for comp_id, info in comp_info.items():
        geom = component_geometric_features(comp_id, info)
        topo = component_topology_features(labels, comp_id)
        row = {
            "task_id": task_id,
            "grid_role": role,
            "grid_index": index,
            **geom,
            **topo,
            **graph_feats,
        }
        rows.append(row)

    return rows

In [9]:
# Load parsed training and evaluation data
with open("outputs/parsed_training.pkl", "rb") as f:
    training = pickle.load(f)

with open("outputs/parsed_evaluation.pkl", "rb") as f:
    evaluation = pickle.load(f)

print("Loaded parsed data.")

Loaded parsed data.


In [10]:
# Extract features for training data
all_features = []

for task_id, task in training.items():

    for i, grid in enumerate(task["train_inputs"]):
        all_features.extend(
            extract_features_for_grid(grid, task_id, "train_input", i)
        )

    for i, grid in enumerate(task["train_outputs"]):
        all_features.extend(
            extract_features_for_grid(grid, task_id, "train_output", i)
        )

    for i, grid in enumerate(task["test_inputs"]):
        all_features.extend(
            extract_features_for_grid(grid, task_id, "test_input", i)
        )

    for i, grid in enumerate(task["test_outputs"]):
        all_features.extend(
            extract_features_for_grid(grid, task_id, "test_output", i)
        )

df_train = pd.DataFrame(all_features)

df_train.to_pickle("outputs/features_training.pkl")

df_train.head()

Unnamed: 0,task_id,grid_role,grid_index,component_id,color,area,bbox_height,bbox_width,bbox_aspect,centroid_r,...,perimeter,beta0,beta1,num_holes,euler_characteristic,graph_num_nodes,graph_num_edges,graph_mean_degree,graph_max_degree,graph_num_connected_components
0,576224,train_input,0,1,7,1,1,1,1.0,0.0,...,4,1,0,0,1,4,4,2.0,2,1
1,576224,train_input,0,2,9,1,1,1,1.0,0.0,...,4,1,0,0,1,4,4,2.0,2,1
2,576224,train_input,0,3,4,1,1,1,1.0,1.0,...,4,1,0,0,1,4,4,2.0,2,1
3,576224,train_input,0,4,3,1,1,1,1.0,1.0,...,4,1,0,0,1,4,4,2.0,2,1
4,576224,train_input,1,1,8,1,1,1,1.0,0.0,...,4,1,0,0,1,4,4,2.0,2,1


In [11]:
# Extract features for evaluation data
all_features_eval = []

for task_id, task in evaluation.items():

    for i, grid in enumerate(task["train_inputs"]):
        all_features_eval.extend(
            extract_features_for_grid(grid, task_id, "train_input", i)
        )

    for i, grid in enumerate(task["train_outputs"]):
        all_features_eval.extend(
            extract_features_for_grid(grid, task_id, "train_output", i)
        )

    for i, grid in enumerate(task["test_inputs"]):
        all_features_eval.extend(
            extract_features_for_grid(grid, task_id, "test_input", i)
        )

    for i, grid in enumerate(task["test_outputs"]):
        all_features_eval.extend(
            extract_features_for_grid(grid, task_id, "test_output", i)
        )

df_eval = pd.DataFrame(all_features_eval)

df_eval.to_pickle("outputs/features_evaluation.pkl")

df_eval.head()

Unnamed: 0,task_id,grid_role,grid_index,component_id,color,area,bbox_height,bbox_width,bbox_aspect,centroid_r,...,perimeter,beta0,beta1,num_holes,euler_characteristic,graph_num_nodes,graph_num_edges,graph_mean_degree,graph_max_degree,graph_num_connected_components
0,0934a4d8,train_input,0,1,3,1,1,1,1.0,0.0,...,4,1,0,0,1,470,1033,4.395745,34,1
1,0934a4d8,train_input,0,2,5,1,1,1,1.0,0.0,...,4,1,0,0,1,470,1033,4.395745,34,1
2,0934a4d8,train_input,0,3,3,6,3,3,1.0,0.833333,...,12,1,0,0,1,470,1033,4.395745,34,1
3,0934a4d8,train_input,0,4,6,4,2,2,1.0,0.5,...,8,1,0,0,1,470,1033,4.395745,34,1
4,0934a4d8,train_input,0,5,5,1,1,1,1.0,0.0,...,4,1,0,0,1,470,1033,4.395745,34,1
