In [1]:
import tensorflow as tf
import pickle
from tensorflow.keras.layers import StringLookup
from tensorflow.keras import layers, Model
import pandas as pd
import random
import collections

2025-05-22 09:29:35.734017: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-22 09:29:35.869208: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747931375.919145  465036 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747931375.933748  465036 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1747931376.049676  465036 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

In [2]:
# load the dataset
df = pd.read_pickle('data/tokenized_dataset.pkl')
# time to remove tunes with only one setting - at least for training triplet loss
counts = df.tune_id.value_counts()
good_ids = counts[counts > 1].index
df = df[df.tune_id.isin(good_ids)]


In [3]:
# build the vocab
all_notes = sorted({n for seq in df.note_seq for n in seq})
note_lookup = tf.keras.layers.StringLookup(
    vocabulary=all_notes,
    mask_token=None,    # 0 will be reserved for padding
    oov_token="[UNK]"
)
df['note_ids'] = df.note_seq.apply(lambda seq: note_lookup(seq).numpy().tolist())

I0000 00:00:1747931380.079881  465036 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 9558 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4070, pci bus id: 0000:01:00.0, compute capability: 8.9


In [None]:
# prep the dataset
# 1) Build index by tune_id
by_id = collections.defaultdict(list)
for notes, durs, tid in zip(df.note_ids, df.dur_seq, df.tune_id):
    by_id[int(tid)].append((notes, durs))
tune_ids = list(by_id.keys())

# 2) Generator that yields one (sample, label) at a time,
#    but cycles in groups of batch_tunes * per_tune to ensure positives in each batch.
def balanced_sample_generator(batch_tunes=8

, per_tune=2):
    while True:
        chosen = random.sample(tune_ids, batch_tunes)
        # build exactly batch_tunes * per_tune samples
        for tid in chosen:
            examples = random.choices(by_id[tid], k=per_tune)
            for notes, durs in examples:
                yield (notes, durs), tid

# 3) Wrap it in a Dataset of *samples*, then pad into batches
ds = tf.data.Dataset.from_generator(
    balanced_sample_generator,
    output_signature=(
        (tf.TensorSpec(shape=(None,), dtype=tf.int32),   # notes sequence
         tf.TensorSpec(shape=(None,), dtype=tf.float32)), # durs sequence
        tf.TensorSpec(shape=(),   dtype=tf.int32)         # tune_id label
    )
).padded_batch(
    batch_size=64,
    padded_shapes=(
        ([None], [None]),  # pad notes→[64, T], durs→[64, T]
        []                 # labels→[64]
    ),
    padding_values=(
        (0, 0.0),          # pad notes with 0, durs with 0.0
        0                  # pad label (unused) with 0
    )
).prefetch(tf.data.AUTOTUNE)

In [5]:
def pairwise_distances(embeddings):
    # embeddings: [batch, dim]
    # returns a [batch, batch] matrix of squared distances
    dot = tf.matmul(embeddings, embeddings, transpose_b=True)
    sq = tf.reduce_sum(tf.square(embeddings), axis=1, keepdims=True)
    # d(i,j) = ||xi - xj||^2 = sq[i] - 2*dot[i,j] + sq[j]
    return tf.maximum(sq - 2.0 * dot + tf.transpose(sq), 0.0)

def batch_hard_triplet_loss(margin=0.3):
    def loss_fn(y_true, y_pred):
        # y_true: [batch] int labels (tune_id)
        # y_pred: [batch, dim] embeddings (already ℓ₂‐normalized or not)
        labels = tf.cast(tf.reshape(y_true, [-1]), tf.int32)
        embeddings = y_pred
        # 1) Pairwise distance matrix
        pdist = pairwise_distances(embeddings)
        # 2) Masks for positive / negative pairs
        labels_eq = tf.equal(tf.expand_dims(labels,1), tf.expand_dims(labels,0))  # [B,B]
        mask_pos = tf.cast(labels_eq, tf.float32) - tf.eye(tf.shape(labels)[0])   # zero diagonal
        mask_neg = 1.0 - tf.cast(labels_eq, tf.float32)

        # 3) For each anchor i, hardest positive = max_{j!=i, same label} d(i,j)
        hardest_pos = tf.reduce_max(pdist * mask_pos, axis=1)
        # 4) For each anchor i, easiest negative = min_{k, different label} d(i,k)
        #    to do that, add large constant to positives so they’re ignored in min()
        max_dist = tf.reduce_max(pdist)
        pdist_neg = pdist + max_dist * (1.0 - mask_neg)
        hardest_neg = tf.reduce_min(pdist_neg, axis=1)

        # 5) Combine with margin
        tl = tf.maximum(hardest_pos - hardest_neg + margin, 0.0)
        return tf.reduce_mean(tl)
    return loss_fn


In [6]:
vocab_list = note_lookup.get_vocabulary()
# Hyperparameters
VOCAB_SIZE = len(vocab_list) + 2     # from your StringLookup (plus PAD & OOV)
EMB_DIM    = 32
RNN_UNITS = 32

# 1) Define your two inputs
notes_in = layers.Input(shape=(None,), dtype="int32",   name="note_ids")
durs_in  = layers.Input(shape=(None,), dtype="float32", name="durations")

# 2) Embed your notes (this is trainable, starts random)
note_emb = layers.Embedding(
    input_dim=VOCAB_SIZE,
    output_dim=EMB_DIM,
    mask_zero=True,   # so padding=0 is ignored by downstream RNN
)(notes_in)          # → shape (batch, timesteps, EMB_DIM)

dur_feat = layers.Lambda(
    lambda x: tf.expand_dims(x, -1),
    mask=lambda inputs, mask: mask  # pass the incoming 2D mask straight through
)(durs_in)
# project durations into EMB_DIM via a Dense layer
dur_emb = layers.TimeDistributed(layers.Dense(EMB_DIM))(dur_feat)
# now both note_emb and dur_emb are (B, T, EMB_DIM)
x = layers.Add()([note_emb, dur_emb])

# → shape (batch, timesteps, EMB_DIM + 1)

# 5) Encode with a Bidirectional GRU (return_sequences=True so we can pool)
rnn_out = layers.Bidirectional(
    layers.GRU(RNN_UNITS, return_sequences=True)
)(x)
# → shape (batch, timesteps, 2*RNN_UNITS)

# 6) Pool across time (takes care of variable lengths & masks)
tune_vec = layers.GlobalAveragePooling1D()(rnn_out)
# → shape (batch, 2*RNN_UNITS)

# 7) L2-normalize if you like (makes cosine‐based losses stable)
tune_emb = layers.Lambda(lambda z: tf.math.l2_normalize(z, axis=1))(tune_vec)

# 8) Build & compile
model = Model(inputs=[notes_in, durs_in], outputs=tune_emb)
model.compile(
    optimizer="adam",
    loss=batch_hard_triplet_loss(margin=0.3)
)

model.summary()


In [7]:
model.fit(ds, epochs=10)

Epoch 1/10


I0000 00:00:1747931431.390043  465244 cuda_dnn.cc:529] Loaded cuDNN version 90300


 335454/Unknown [1m13984s[0m 42ms/step - loss: 0.0625

KeyboardInterrupt: 

In [10]:
# after training…
model.save("saved_models/tune_embedder_v0.keras")  
# → creates a SavedModel directory you can reload anywhere
