In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, Input
import numpy as np

%load_ext autoreload 
%autoreload 2

In [None]:

from tensorflow.keras import regularizers
EMBED_DIM = 128  # final embedding size

class L2Normalization(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(L2Normalization, self).__init__(**kwargs)

    def call(self, inputs):
        return tf.math.l2_normalize(inputs, axis=1)

    # REQUIRED for ONNX/Saving:
    def get_config(self):
        config = super(L2Normalization, self).get_config()
        return config

def build_encoder():
    # Input: 128x128 distance matrix
    mat_in = Input(shape=(128, 128, 2), name="dist_matrix")
    
    # Block 1
    x = layers.Conv2D(32, (5,5), padding="same", kernel_regularizer=regularizers.l2(1e-4))(mat_in)
    x = layers.BatchNormalization()(x) # Added BN
    x = layers.Activation("relu")(x)
    x = layers.MaxPool2D((2,2))(x)
    
    # Block 2
    x = layers.Conv2D(64, (3,3), padding="same", kernel_regularizer=regularizers.l2(1e-4))(x)
    x = layers.BatchNormalization()(x) # Added BN
    x = layers.Activation("relu")(x)
    x = layers.MaxPool2D((2,2))(x)
    
    # Block 3
    x = layers.Conv2D(128, (3,3), padding="same", kernel_regularizer=regularizers.l2(1e-4))(x)
    x = layers.BatchNormalization()(x) # Added BN
    x = layers.Activation("relu")(x)
    
    # Block 4 (Optional: Go deeper since we have more data now)
    x = layers.MaxPool2D((2,2))(x)
    x = layers.Conv2D(256, (3,3), padding="same", kernel_regularizer=regularizers.l2(1e-4))(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    
    # Dual Pooling: Capture both peak features and average intensity
    gmax = layers.GlobalMaxPooling2D()(x)
    gavg = layers.GlobalAveragePooling2D()(x)
    x = layers.Concatenate()([gmax, gavg]) # Feature vector size doubles
    
    # Dense Head
    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.4)(x) # Increased dropout slightly for larger model
    
    # Embedding
    # Increased dim to 128 to capture more nuance in 16 classes
    emb = layers.Dense(128, activation=None, name="embedding")(x)
    emb = L2Normalization()(emb)

    return models.Model(mat_in, emb, name="drawing_encoder")

In [None]:
def build_siamese(encoder):
    matA = Input(shape=(128, 128, 2), name="matrix_A")
    matB = Input(shape=(128, 128, 2), name="matrix_B")

    embA = encoder(matA)
    embB = encoder(matB)

    dist = layers.Lambda(
        lambda x: tf.sqrt(tf.reduce_sum(tf.square(x[0] - x[1]), axis=1, keepdims=True) + 1e-7)
    )([embA, embB])

    return models.Model([matA, matB], dist)

In [None]:
def contrastive_loss(y_true, y_pred, margin=1.0):
    """
    y_true: 1 if same class, 0 if different
    y_pred: distance between embeddings
    """
    squared = tf.square(y_pred)
    margin_squared = tf.square(tf.maximum(margin - y_pred, 0))
    return tf.reduce_mean(y_true * squared + (1 - y_true) * margin_squared)

In [None]:
import numpy as np
import random

def make_pairs(matrices, labels, batch_size=8):
    """
    Yield batches of distance matrix pairs and targets.
    """
    num_samples = len(matrices)
    
    # Pre-group indices by class for fast positive sampling
    class_to_idxs = {}
    for idx, c in enumerate(labels):
        class_to_idxs.setdefault(c, []).append(idx)

    while True:
        matA_batch, matB_batch, y_batch = [], [], []

        for _ in range(batch_size):
            anchor_idx = random.randrange(num_samples)
            anchor_label = labels[anchor_idx]

            # Positive pair 50%
            if random.random() < 0.5:
                pos_idx = random.choice(class_to_idxs[anchor_label])
                while pos_idx == anchor_idx:
                    pos_idx = random.choice(class_to_idxs[anchor_label])
                matA_batch.append(matrices[anchor_idx])
                matB_batch.append(matrices[pos_idx])
                y_batch.append(1.0)
            # Negative pair 50%
            else:
                neg_label = random.choice([l for l in class_to_idxs.keys() if l != anchor_label])
                neg_idx = random.choice(class_to_idxs[neg_label])
                matA_batch.append(matrices[anchor_idx])
                matB_batch.append(matrices[neg_idx])
                y_batch.append(0.0)

        yield (
            (np.array(matA_batch), np.array(matB_batch)),
            np.array(y_batch).reshape(-1, 1)
        )

In [None]:
encoder = build_encoder()
siamese = build_siamese(encoder)

siamese.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=contrastive_loss
)

In [None]:
import numpy as np

data_train = np.load('drawing_dataset.npz', allow_pickle=True)  # must allow pickle for object arrays
X_train = data_train['X']  # drawings
y_train = data_train['y']  # labels

data_val = np.load('validation_dataset.npz', allow_pickle=True)
X_val = data_val['X']
y_val = data_val['y']


In [None]:
def compute_distance_matrix(points):
    """
    points: (num_points, 3)
    returns: (num_points, num_points) distance matrix
    """
    points = np.asarray(points, dtype=np.float32)

    diff = points[:, None, :] - points[None, :, :]
    dist = np.linalg.norm(diff, axis=-1)

    scale = np.max(dist)
    return dist / (scale + 1e-8)

def process_matrices(X_raw):
    """
    Converts raw drawing points into formatted distance matrices
    """
    # Ensure input is a consistent array of arrays
    X_processed = np.array([np.array(p, dtype=np.float32) for p in X_raw])
    
    # Compute distance matrices
    matrices = np.array([compute_distance_matrix(p) for p in X_processed])
    
    # Add channel dimension for Conv2D (N, 128, 128, 1)
    matrices = matrices[..., np.newaxis]
    return matrices

def compute_distance_matrix_heightchannel(points):
    """
    points: (num_points, 3)
    returns: (num_points, num_points, 2) -> [Distance, Y-Diff]
    """
    points = np.asarray(points, dtype=np.float32)

    # Channel 1: Euclidean Distance
    diff = points[:, None, :] - points[None, :, :]
    dist = np.linalg.norm(diff, axis=-1)
    
    # Global scale for normalization (applied to both channels to keep aspect ratio)
    scale = np.max(dist) + 1e-8
    dist_norm = dist / scale

    # Channel 2: Y-Axis Difference
    # Calculates (y1 - y2) for every point pair
    y_diff = points[:, 1][:, None] - points[None, :, 1]
    y_diff_norm = y_diff / scale

    # Stack them: Result is (128, 128, 2)
    return np.stack([dist_norm, y_diff_norm], axis=-1)

def process_matrices_heightchannel(X_raw):
    """
    Converts raw drawing points into formatted 2-channel matrices
    """
    X_processed = np.array([np.array(p, dtype=np.float32) for p in X_raw])
    
    # Compute 2-channel matrices
    # Returns (N, 128, 128, 2)
    matrices = np.array([compute_distance_matrix_heightchannel(p) for p in X_processed])
    
    return matrices


train_matrices = process_matrices_heightchannel(X_train)
val_matrices = process_matrices_heightchannel(X_val)

y_train = np.array(y_train)
y_val = np.array(y_val)

# Parameters
batch_size = 64
steps_per_epoch = 200
epochs = 100

import tensorflow as tf


def train_generator_fn():
    return make_pairs(train_matrices, y_train, batch_size)

def val_generator_fn():
    return make_pairs(val_matrices, y_val, batch_size)

# Use TF dataset, mostly to avoid restructuring other code to use numpy types
train_dataset = tf.data.Dataset.from_generator(
    train_generator_fn,
    output_signature=(
        (
            tf.TensorSpec(shape=(None, 128, 128, 2), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 128, 128, 2), dtype=tf.float32)
        ),
        tf.TensorSpec(shape=(None, 1), dtype=tf.float32)
    )
)
val_dataset = tf.data.Dataset.from_generator(
    val_generator_fn,
    output_signature=(
        (
            tf.TensorSpec(shape=(None, 128, 128, 2), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 128, 128, 2), dtype=tf.float32)
        ),
        tf.TensorSpec(shape=(None, 1), dtype=tf.float32)
    )
)

from tensorflow.keras.callbacks import EarlyStopping

early_stop = EarlyStopping(
    monitor="val_loss",
    patience=20, # epochs without improvement
    restore_best_weights=True
)

history = siamese.fit(
    train_dataset,
    steps_per_epoch=steps_per_epoch,
    validation_data=val_dataset,
    validation_steps=5,
    epochs=epochs,
    callbacks=[early_stop]
)

In [None]:
encoder.save("drawing_encoder_model.h5")