In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import logging
import time

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# read-in cleaned data and parse to corpus and tokenizer
# path = '../data/'
path = '/content/'
filepath = path + 'allphishsets.csv'

df = pd.read_csv(filepath)
df = df.sort_values(by=['showdate', 'set', 'position'],
                    ascending=[True, True, True])

df.loc[df['times_played'] <= 2, 'slug'] = 'wildcard'
df.loc[df['times_played'] <= 2, 'times_played'] = 510

songstring = df[['showdate', 'set', 'slug']].groupby(['showdate', 'set'])['slug']\
                                            .apply(lambda x: '|'.join(x)).reset_index()
songstring['full'] = songstring.apply(lambda row: f"set-{row['set']}|{row['slug']}", axis=1)

songstring = songstring[['showdate', 'full']].groupby(['showdate'])['full']\
                                             .apply(lambda x: '|'.join(x)).reset_index()

songstring['full'] += '|eos'

corpus = [''.join(map(lambda s: s.replace('|', ' '), f))
          for f in songstring['full']]

tokenizer = Tokenizer(filters='')
tokenizer.fit_on_texts(corpus)
unique_words = len(tokenizer.word_index) + 1


def PrepareDataset(corpus: list, tokenizer: Tokenizer,
                   n_context: int, batch_size: int, train_split: float):
    """
    Prepares Datasets for training and validation data from Setlist data
    Args:
      corpus :: list :: full corpus of songs composed of setlists as sequences
      tokenizer :: Tokenizer :: keras Tokenizer object trained on corpus
      n_context :: int :: number of previous setlist to use as context for a
                          given setlist
      batch_size :: int :: batch size for datasets
      train_split :: float :: values between 0 and 1, splits the data for
                              training and validation
    """
    max_seq_length = max([len(setlist.split(' ')) for setlist in corpus]) - 1

    x_inputs = []
    x_outputs = []
    for line in corpus:
        token_list = tokenizer.texts_to_sequences([line])[0]
        x_inputs.append(token_list[:-1])  #drop eos
        x_outputs.append(token_list[1:])  #drop set-1

    x_inputs = np.array(
        pad_sequences(x_inputs, maxlen=max_seq_length, padding='post')
    )
    x_outputs = np.array(
        pad_sequences(x_outputs, maxlen=max_seq_length, padding='post')
    )

    # last n shows as the context vector for each show
    n = n_context
    n_shows = []
    for i in range(len(corpus[:-n])):
        n_shows.append(' '.join(corpus[i:i+n]))

    context = []
    for line in n_shows:
        token_list = tokenizer.texts_to_sequences([line])[0]
        context.append(token_list)

    max_context_length = max([len(x) for x in context])
    x_context = np.array(
        pad_sequences(context, maxlen=max_context_length, padding='post')
    )

    x_context = x_context[:-1]
    x_inputs = x_inputs[n+1:]
    x_outputs = x_outputs[n+1:]

    buffer_size = len(x_context)
    train_size = int(train_split*buffer_size)

    dataset = tf.data.Dataset.from_tensor_slices(((x_context, x_inputs), x_outputs))
    shuffled_data = dataset.shuffle(buffer_size)

    train_data = dataset.take(train_size) \
                        .batch(batch_size) \
                        .prefetch(buffer_size=tf.data.AUTOTUNE)

    val_data = dataset.skip(train_size) \
                      .batch(batch_size) \
                      .prefetch(buffer_size=tf.data.AUTOTUNE)

    return train_data, val_data

In [2]:
# https://www.tensorflow.org/text/tutorials/transformer
# classes for positional embedding and attention layers
def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

  angle_rates = 1 / (10000**depths)         # (1, depth)
  angle_rads = positions * angle_rates      # (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1)

  return tf.cast(pos_encoding, dtype=tf.float32)


class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
    self.pos_encoding = positional_encoding(length=2048, depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

  def call(self, x):
    length = tf.shape(x)[1]
    x = self.embedding(x)
    # This factor sets the relative scale of the embedding and positonal_encoding.
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x


class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()


class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = self.mha(
        query=x,
        key=context,
        value=context,
        return_attention_scores=True)

    # Cache the attention scores for plotting later.
    self.last_attn_scores = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x


class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x


class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

In [3]:
# classes for NN model architecture
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x)
    return x


class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x


class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size, d_model=d_model)

    self.enc_layers = [
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate)
        for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    # `x` is token-IDs shape: (batch, seq_len)
    x = self.pos_embedding(x)  # Shape `(batch_size, seq_len, d_model)`.

    # Add dropout.
    x = self.dropout(x)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x)

    return x  # Shape `(batch_size, seq_len, d_model)`.


class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x, context):
    x = self.causal_self_attention(x=x)
    x = self.cross_attention(x=x, context=context)

    # Cache the last attention scores for plotting later
    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x


class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    self.last_attn_scores = None

  def call(self, x, context):
    # `x` is token-IDs shape (batch, target_seq_len)
    x = self.pos_embedding(x)  # (batch_size, target_seq_len, d_model)

    x = self.dropout(x)

    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x


class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()
    self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=input_vocab_size,
                           dropout_rate=dropout_rate)

    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=target_vocab_size,
                           dropout_rate=dropout_rate)

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    context, x  = inputs

    context = self.encoder(context)  # (batch_size, context_len, d_model)

    x = self.decoder(x, context)  # (batch_size, target_len, d_model)

    # Final linear layer output.
    logits = self.final_layer(x)  # (batch_size, target_len, target_vocab_size)

    try:
      # Drop the keras mask, so it doesn't scale the losses/metrics.
      # b/250038731
      del logits._keras_mask
    except AttributeError:
      pass

    # Return the final output and the attention weights.
    return logits


class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)


# performance metrics
def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred

  mask = label != 0

  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)

In [4]:
train_data, val_data = PrepareDataset(
    corpus=corpus,
    tokenizer=tokenizer,
    n_context=5,
    batch_size=8,
    train_split=0.8
)

d_model = 256

learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(
    learning_rate,
    beta_1=0.9,
    beta_2=0.98,
    epsilon=1e-9
)

num_layers = 4
dff = 512
num_heads = 8
dropout_rate = .5
epochs = 30

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=unique_words,
    target_vocab_size=unique_words,
    dropout_rate=dropout_rate
)

transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy]
)

transformer.fit(train_data, validation_data=val_data, epochs=epochs)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x7ded1df665c0>

In [5]:
# model the output

for (context, input), labels in val_data.take(3):
  break

encoder_input = tf.reshape(context[0], (1, len(context[0])))

output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
output_array = output_array.write(0, 1)

for i in tf.range(25):
    output = tf.reshape(output_array.stack(), (1, len(output_array.stack())))
    predictions = transformer([encoder_input, output], training=False)

    # Select the last token from the `seq_len` dimension.
    predictions = predictions[:, -1:, :]  # Shape `(batch_size, 1, vocab_size)`.

    predicted_id = tf.argmax(predictions, axis=-1)[0][0]

    # Concatenate the `predicted_id` to the output which is given to the
    # decoder as its input.
    output_array = output_array.write(i+1, predicted_id)

    if predicted_id == 4:
        break

[tokenizer.index_word[s] for s in output_array.stack().numpy()]

['set-1',
 'punch-you-in-the-eye',
 'gumbo',
 'birds-of-a-feather',
 'guyute',
 'my-soul',
 'set-2',
 'tweezer',
 'have-mercy',
 'taste',
 'the-moma-dance',
 'mountains-in-the-mist',
 'you-enjoy-myself',
 'set-e']

In [9]:
[tokenizer.index_word[s] for s in labels[0].numpy() if s != 0]

['fuego',
 'my-soul',
 'back-on-the-train',
 '555',
 'dog-faced-boy',
 'fuck-your-face',
 'horn',
 'frankie-says',
 'my-friend-my-friend',
 'roses-are-free',
 'roggae',
 'birds-of-a-feather',
 'wingsuit',
 'set-2',
 'possum',
 'crosseyed-and-painless',
 'light',
 'the-dogs',
 'lengthwise',
 'twist',
 'wading-in-the-velvet-sea',
 'harry-hood',
 'golgi-apparatus',
 'backwards-down-the-number-line',
 'set-e',
 'waiting-all-night',
 'sing-monica',
 'the-star-spangled-banner',
 'eos']

In [10]:
[tokenizer.index_word[s] for s in encoder_input.numpy()[0] if s != 0]

['set-1',
 '46-days',
 'tube',
 'train-song',
 'ghost',
 'sparkle',
 'sample-in-a-jar',
 'divided-sky',
 'the-line',
 'its-ice',
 'kill-devil-falls',
 'bathtub-gin',
 'set-2',
 '555',
 'backwards-down-the-number-line',
 'down-with-disease',
 'fuego',
 'twist',
 'bouncing-around-the-room',
 'david-bowie',
 'character-zero',
 'set-e',
 'harry-hood',
 'grind',
 'eos',
 'set-1',
 'devotion-to-a-dream',
 'acdc-bag',
 'my-sweet-one',
 'the-moma-dance',
 'halleys-comet',
 'funky-bitch',
 'wolfmans-brother',
 'destiny-unbound',
 'timber-jerry-the-mule',
 'tela',
 'wingsuit',
 'set-2',
 'free',
 'golden-age',
 'gotta-jibboo',
 'carini',
 'piper',
 'prince-caspian',
 'tweezer',
 'rock-and-roll',
 'you-enjoy-myself',
 'set-e',
 'suzy-greenberg',
 'tweezer-reprise',
 'eos',
 'set-1',
 'walfredo',
 'ocelot',
 'camel-walk',
 'axilla',
 'rift',
 '555',
 'maze',
 'brian-and-robert',
 'stash',
 'party-time',
 '46-days',
 'set-2',
 'sand',
 'birds-of-a-feather',
 'waiting-all-night',
 'ghost',
 'bug',
 

In [25]:
# Define a function to generate a sequence based on a given seed
def generate_sequence(seed_sequence, max_length=25):
    encoder_input = tf.constant([tokenizer.word_index[word] for word in seed_sequence], dtype=tf.int64)
    encoder_input = tf.reshape(encoder_input, (1, len(encoder_input)))

    output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output_array = output_array.write(0, tokenizer.word_index[seed_sequence[-1]])

    unique_tokens = set([tokenizer.word_index[word] for word in seed_sequence])

    i = 0
    while i < max_length:
        output = tf.reshape(output_array.stack(), (1, len(output_array.stack())))
        predictions = transformer([encoder_input, output], training=False)

        predictions = predictions[:, -1:, :]
        predicted_id = tf.argmax(predictions, axis=-1)[0][0]

        # Convert the scalar TensorFlow tensor to a tuple
        predicted_id_tuple = (predicted_id.numpy(),)

        # Check for duplicates
        if predicted_id_tuple in unique_tokens :

            output_array = output_array.write(i+1, predicted_id)
            i += 1
            continue

        unique_tokens.add(predicted_id_tuple)

        output_array = output_array.write(i+1, predicted_id)

        unique_tokens = set(unique_tokens)
        #if predicted_id == 4:
        #    output_array = output_array.write(i+1, predicted_id)
        #    i += 1
        #    continue;

        i += 1

    predicted_sequence = [tokenizer.index_word[s] for s in output_array.stack().numpy()]

    return predicted_sequence

# Example usage:
#org_list = corpus[-1].split()
org_list = [tokenizer.index_word[s] for s in labels[6].numpy() if s != 0]
org_list = ['farmhouse', 'first-tube', 'twist', 'divided-sky', 'ginseng-sullivan', 'carini', 'whats-the-use', 'wildcard', 'set-2', 'down-with-disease', 'the-moma-dance', 'piper', 'fee', 'gotta-jibboo', 'saw-it-again', 'split-open-and-melt', 'cavern', 'david-bowie', 'set-e', 'the-squirming-coil', 'eos']
seed_sequence = org_list[:3]
max_length = len(org_list) - len(seed_sequence)
predicted_sequence = generate_sequence(seed_sequence, max_length=max_length)

print("Original Sequence:", org_list)
print("Seed Sequence:", seed_sequence)
print("Predicted Sequence:", (predicted_sequence[len(seed_sequence):]))


Original Sequence: ['farmhouse', 'first-tube', 'twist', 'divided-sky', 'ginseng-sullivan', 'carini', 'whats-the-use', 'wildcard', 'set-2', 'down-with-disease', 'the-moma-dance', 'piper', 'fee', 'gotta-jibboo', 'saw-it-again', 'split-open-and-melt', 'cavern', 'david-bowie', 'set-e', 'the-squirming-coil', 'eos']
Seed Sequence: ['farmhouse', 'first-tube', 'twist']
Predicted Sequence: ['limb-by-limb', 'farmhouse', 'water-in-the-sky', 'limb-by-limb', 'train-song', 'water-in-the-sky', 'character-zero', 'set-2', 'runaway-jim', 'the-moma-dance', 'piper', 'prince-caspian', 'you-enjoy-myself', 'set-e', 'wildcard', 'eos']


In [26]:
# Assuming org_list, seed_sequence, stopword_list, and predicted_sequence are lists of songs

stopword_list = {}


# Convert the lists to sets
org_set = set(org_list)
seed_set = set(seed_sequence)
not_in_seed_sequence = org_set - seed_set
filtered_songs = [song.strip() for song in not_in_seed_sequence if song not in stopword_list]
predicted_sequence = [song.strip() for song in predicted_sequence[len(seed_sequence):] if song not in stopword_list]

# Count the number of matching songs
matching_songs = [song for song in filtered_songs if song in predicted_sequence]
num_matching_songs = len(matching_songs)

# Calculate the percentage of matching songs
percentage_matching = (num_matching_songs / len(filtered_songs)) * 100

print("Matching Songs in Predicted Sequence:")
print(matching_songs)
print("Number of Matching Songs:", num_matching_songs)
print("Percentage of Matching Songs:", percentage_matching)

Matching Songs in Predicted Sequence:
['set-e', 'wildcard', 'piper', 'the-moma-dance', 'eos', 'set-2']
Number of Matching Songs: 6
Percentage of Matching Songs: 33.33333333333333
