In [None]:
import math
import os
import random
import zipfile
from importlib import reload
from io import BytesIO

import matplotlib.pyplot as plt
import mglyph as mg
import numpy as np
import torch
import torchvision
import torchvision.transforms.functional as TF
from IPython.display import clear_output, display
from matplotlib import pyplot as plt
from PIL import Image
from torch import Tensor, nn
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision.io import ImageReadMode

import mglyph_ml
import mglyph_ml.lib as lib
from mglyph_ml.data.glyph_dataset import GlyphDataset, GlyphSample
from mglyph_ml.glyph_importer import GlyphImporter
from mglyph_ml.manifest_parsing import Manifest
from mglyph_ml.nn.utils import train_one_epoch

# Reload all mglyph_ml modules to pick up code changes
reload(mglyph_ml.glyph_importer)
reload(mglyph_ml.manifest_parsing)
reload(mglyph_ml.data.glyph_dataset)
reload(mglyph_ml.nn.utils)
reload(mglyph_ml.lib)
reload(mglyph_ml)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

In [None]:
# here, we simply set up a glyph provider that out GlyphDataset will use to load glyphs from the export
train_glyphs = ["train-square.mglyph", "train-triangle.mglyph", "train-circle.mglyph"]
importers_train = [
    GlyphImporter(f"data/glyphs-experiment-1/{glyph}") for glyph in train_glyphs
]
dataset_train: GlyphDataset = GlyphDataset(*importers_train)

test_glyphs = ["test-square.mglyph", "test-triangle.mglyph", "test-circle.mglyph"]
importers = [
    GlyphImporter(f"data/glyphs-experiment-1/{glyph}") for glyph in test_glyphs
]
dataset_test: GlyphDataset = GlyphDataset(
    *importers,
    augmentation_seed=69
)  # Changed from importers_train to importers_test

In [None]:
# we create a temporary dataset with normalization turned off so that we can see what exactly is fed into the NN
temp_dataset: GlyphDataset = GlyphDataset(*importers, normalize=False, augmentation_seed=69)

In [None]:
# we get a couple of random indices from the dataset so that we can test if the augmentation
# is always the same
random_sample_indices = random.sample(range(len(temp_dataset)), 9)

In [None]:
# Visualize random test samples using the utility function
from mglyph_ml.nn.utils import visualize_test_samples

visualize_test_samples(
    importers=importers,
    num_samples=12,
    figsize=(6, 6),
    augmentation_seed=69
)

In [None]:
# quick sanity check before training
# we check that the input data is of the expected shape and properly normalized
print("Sample image shape:", dataset_train[0][0])
print("Sample label:", dataset_train[0][1])

In [None]:
# next, we create the model
from mglyph_ml.nn.glyph_regressor_gen2 import GlyphRegressor
model = GlyphRegressor()
# we move the model to the GPU for much faster training (if GPU is available)
if device == 'cuda':
    model = model.to(device)

In [None]:
def evaluate_glyph_regressor(
    model: nn.Module, 
    data_loader: DataLoader, 
    device: str, 
    criterion
) -> tuple[float, float]:
    """
    Takes a glyph regressor, temporarily disables gradient calculation, and calculates the average
    loss on the given dataset (DataLoader). Processes in batches on GPU for efficiency.

    Returns a tuple containing the average loss and average error (mean absolute error)
    """
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    running_error = 0.0
    num_batches = 0
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels = labels.view(-1, 1)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            # Calculate error as the average absolute difference (y_hat - y)
            error = torch.mean(torch.abs(outputs - labels)).item()
            
            running_loss += loss.item()
            running_error += error
            num_batches += 1
    
    avg_loss = running_loss / num_batches if num_batches > 0 else 0.0
    avg_error = running_error / num_batches if num_batches > 0 else 0.0
    model.train()  # Set model back to training mode
    return avg_loss, avg_error

In [None]:
indices_debug = list(range(0, len(dataset_train), 16))
dataset_debug = Subset(dataset_train, indices_debug)

# simply change the dataset to any other, and train :)
# data_loader_train = DataLoader(dataset_debug, batch_size=16, shuffle=True)
data_loader_train = DataLoader(dataset_train, batch_size=64, shuffle=True)
data_loader_test = DataLoader(dataset_test, batch_size=64)

criterion = nn.MSELoss()
# optimizer = torch.optim.SGD(model.parameters(), lr=0.0003, momentum=0.00001)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

fig, ax1 = plt.subplots(figsize=(6, 4))
ax2 = ax1.twinx()
losses = []
errors = []
test_losses = []
test_errors = []

# Train for 10 epochs
for epoch in range(10):
    loss, error = train_one_epoch(model, data_loader_train, device, criterion, optimizer)
    error *= 100.0  # Convert normalized error (0-1) to actual x units (0-100)
    losses.append(loss)
    errors.append(error)

    # reset the dataloaer's transform in order to make the test dataset 100% reproducible
    dataset_test.reset_transform()
    test_loss, test_error = evaluate_glyph_regressor(model, data_loader_test, device, criterion)
    test_error *= 100.0  # Convert normalized error (0-1) to actual x units (0-100)
    test_losses.append(test_loss)
    test_errors.append(test_error)

    # Clear previous plots
    ax1.clear()
    ax2.clear()
    
    # Plot updated data with markers (dashed lines for loss, solid for error)
    ax1.plot(range(len(losses)), losses, color='green', label='Train Loss', marker='o', markersize=4, linestyle='--')
    ax2.plot(range(len(errors)), errors, color='green', label='Train Error', marker='o', markersize=4, linestyle='-')
    ax1.plot(range(len(test_losses)), test_losses, color='red', label='Test Loss', marker='o', markersize=4, linestyle='--')
    ax2.plot(range(len(test_errors)), test_errors, color='red', label='Test Error', marker='o', markersize=4, linestyle='-')
    
    ax1.grid(True, alpha=0.3)
    # Set labels and x-axis ticks
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss (MSE)', color='black')
    ax2.set_ylabel('Error (Mean Absolute Error, x units)', color='black')
    ax2.yaxis.set_label_position('right')
    ax1.set_xticks(range(len(losses)))
    ax1.legend(loc='upper left')
    ax2.legend(loc='upper right')
    
    # Update display
    clear_output(wait=True)
    # Early stopping: stop if error is good enough (< 0.3 x units)
    if test_error < 0.3:
        print(f"Early stopping at epoch {epoch+1}: test error {test_error:.4f} x units is below threshold")
        display(fig)
        break
    display(fig)

In [None]:
# Let's check the actual dataset sizes and understand why convergence is fast
print("=" * 60)
print("DATASET SIZE ANALYSIS")
print("=" * 60)
print(f"Full training dataset size: {len(dataset_train)}")
print(f"Debug subset size (every 16th): {len(list(range(0, len(dataset_train), 16)))}")
print(f"Test dataset size: {len(dataset_test)}")
print(f"\nBatch size: 64")
print(f"Batches per epoch (debug): {len(list(range(0, len(dataset_train), 16))) / 64:.1f}")
print(f"Batches per epoch (full): {len(dataset_train) / 64:.1f}")
print("\nNote: Using only every 16th sample means the model sees very little data!")
print("Consider training on the full dataset or increasing the subset size.")
print("=" * 60)

In [None]:
# Saving and loading the model!
# torch.save(model, )

In [None]:
# Evaluate all examples and collect errors
model.eval()
model = model.to(device)
train_errors = []
test_errors = []

with torch.no_grad():
    # Evaluate training set
    for i in range(len(dataset_train)):
        input, label = dataset_train[i]
        img_batch = input.unsqueeze(0).to(device)
        pred = model(img_batch).item()
        error = (pred - label) * 100  # Convert to x units (0-100)
        train_errors.append(error)
    
    # Evaluate test set
    dataset_test.reset_transform()
    for i in range(len(dataset_test)):
        input, label = dataset_test[i]
        img_batch = input.unsqueeze(0).to(device)
        pred = model(img_batch).item()
        error = (pred - label) * 100  # Convert to x units (0-100)
        test_errors.append(error)

train_mean = np.mean(train_errors)
train_std = np.std(train_errors)
test_mean = np.mean(test_errors)
test_std = np.std(test_errors)

plt.figure(figsize=(10, 6))
bins = np.linspace(min(min(train_errors), min(test_errors)), 
                   max(max(train_errors), max(test_errors)), 50)

plt.hist(train_errors, bins=bins, alpha=0.5, label=f'Training (μ={train_mean:.2f}, σ={train_std:.2f})', 
         density=True, color='blue')
plt.hist(test_errors, bins=bins, alpha=0.5, label=f'Test (μ={test_mean:.2f}, σ={test_std:.2f})', 
         density=True, color='red')

plt.xlabel('Prediction Error (x units, 0-100)')
plt.ylabel('Density')
plt.title('Distribution of Prediction Errors on Training and Test Sets')
plt.legend()
plt.grid(True, alpha=0.3)

print(f"Training Set Statistics:")
print(f"  Mean Error: {train_mean:.2f} x units")
print(f"  Std Dev:    {train_std:.2f} x units")
print(f"  Min Error:  {min(train_errors):.2f} x units")
print(f"  Max Error:  {max(train_errors):.2f} x units")
print(f"\nTest Set Statistics:")
print(f"  Mean Error: {test_mean:.2f} x units")
print(f"  Std Dev:    {test_std:.2f} x units")
print(f"  Min Error:  {min(test_errors):.2f} x units")
print(f"  Max Error:  {max(test_errors):.2f} x units")

plt.show()

In [None]:
# Visualize predictions on test samples using the utility function
from mglyph_ml.nn.utils import visualize_test_predictions

visualize_test_predictions(
    model=model,
    importers=importers,
    device=device,
    num_samples=9,
    figsize=(6, 6),
    augmentation_seed=69
)

In [None]:
from torch import nn
# here, we can analyze the model a little bit, see what it's doing internally
def visualize_kernels(model: nn.Module, layer_idx: int = 0, ncols: int = 8, figsize=(10,10), cmap="viridis"):
    """
    Visualize convolution kernels from the model's feature extractor.
    - If kernel has 3 input channels, shows as RGB.
    - Otherwise shows averaged (grayscale) kernel per output channel.
    - layer_idx: 0 for first conv, 1 for second conv, 2 for third conv
    """
    # get the conv layer from sequential
    conv_layers = [module for module in model.features if isinstance(module, nn.Conv2d)]
    if layer_idx >= len(conv_layers):
        raise ValueError(f"Layer index {layer_idx} is out of range. Model has {len(conv_layers)} conv layers.")
    
    layer = conv_layers[layer_idx]
    weight = layer.weight.detach().cpu()  # shape: (out_ch, in_ch, kH, kW)
    out_ch, in_ch, _, _ = weight.shape

    nrows = math.ceil(out_ch / ncols)
    fig, axes = plt.subplots(nrows, ncols, figsize=figsize)
    axes = np.array(axes).reshape(-1)

    for i in range(nrows * ncols):
        ax = axes[i]
        ax.axis("off")
        if i >= out_ch:
            continue
        kern = weight[i]  # (in_ch, kH, kW)
        if in_ch == 3:
            # to H,W,C for display; normalize per-filter
            img = kern.permute(1, 2, 0).numpy()
            # normalize to 0..1
            mi, ma = img.min(), img.max()
            if ma - mi > 0:
                img = (img - mi) / (ma - mi)
            ax.imshow(img)
            ax.set_title(f"f{i} (RGB)")
        else:
            # average across input channels -> single plane
            img = kern.mean(dim=0).numpy()
            mi, ma = img.min(), img.max()
            if ma - mi > 0:
                img = (img - mi) / (ma - mi)
            ax.imshow(img, cmap=cmap)
            ax.set_title(f"f{i} (avg)")
    plt.tight_layout()
    return fig

# After training
# Visualize the three convolutional layers
fig1 = visualize_kernels(model, layer_idx=0, ncols=4, figsize=(8,8))  # First conv (16 kernels)
plt.show()
fig2 = visualize_kernels(model, layer_idx=1, ncols=8, figsize=(8,4))  # Second conv (32 kernels)
plt.show()
fig3 = visualize_kernels(model, layer_idx=2, ncols=8, figsize=(8,8))  # Third conv (64 kernels)
plt.show()

In [None]:
import os
from PIL import Image
import numpy as np

def export_kernels(model: torch.nn.Module, layer_name: str, out_dir: str, cmap: str = "viridis"):
    """
    Export all kernels from a specified convolutional layer to images in `out_dir`.
    - If kernels have 3 input channels, saves as RGB images.
    - Otherwise, saves as grayscale images (averaged over input channels).
    """
    os.makedirs(out_dir, exist_ok=True)
    layer = getattr(model, layer_name, None)
    if layer is None:
        raise ValueError(f"Model has no attribute '{layer_name}'")
    weight = layer.weight.detach().cpu()  # (out_ch, in_ch, kH, kW)
    out_ch, in_ch, kH, kW = weight.shape
    for i in range(out_ch):
        kern = weight[i]  # (in_ch, kH, kW)
        if in_ch == 3:
            # RGB kernel: (3, kH, kW) -> (kH, kW, 3)
            img = kern.permute(1, 2, 0).numpy()
            mi, ma = img.min(), img.max()
            if ma - mi > 0:
                img = (img - mi) / (ma - mi)
            img = (img * 255).astype(np.uint8)
            im = Image.fromarray(img, mode="RGB")
        else:
            # Grayscale: average over input channels
            img = kern.mean(dim=0).numpy()
            mi, ma = img.min(), img.max()
            if ma - mi > 0:
                img = (img - mi) / (ma - mi)
            img = (img * 255).astype(np.uint8)
            im = Image.fromarray(img, mode="L")
        out_path = os.path.join(out_dir, f"{layer_name}_kernel_{i}.png")
        im.save(out_path)
    print(f"Exported {out_ch} kernels from '{layer_name}' to {out_dir}")

export_kernels(model=model, layer_name="conv3", out_dir="data/kernels")