In [None]:
import os

import numpy as np
import tensorflow as tf
from tensorflow import keras

from typing import List
from tqdm import trange

import config, music_model, utils

### CONFIGURATION ###

ROOT_PATH = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
DATASET_NAME = 'tf_data7dict'
USE_SMALL_GENRE_SET = DATASET_NAME == 'tf_data7dict'
USE_ONE_GPU = True

conf = config.Config("single_instruments_type", ROOT_PATH)

if USE_SMALL_GENRE_SET:
    conf.accepted_subgenres = ['folk', 'nes', 'maestro']
# If we need to use only the first GPU
if USE_ONE_GPU:
    conf.GPUS = tf.config.experimental.list_physical_devices('GPU')[0]
    conf.BATCH_SIZE = conf.BATCH_SIZE
    conf.GLOBAL_BATCH_SIZE = conf.BATCH_SIZE
    conf.num_devices = 1

### MODEL CREATION ###

if conf.num_devices > 1:
    print("Using multiple GPUs with Mirrored Strategy")
    with conf.training_strategy.scope():
        model = music_model.create_model(conf,
                                         num_genres=len(conf.accepted_subgenres),
                                         use_regularization=False,
                                         use_masking_layers=False)
else:
    print("Using single GPU/CPU device")
    model = music_model.create_model(conf,
                                     num_genres=len(conf.accepted_subgenres),
                                     use_regularization=False,
                                     use_masking_layers=False)

---

## Quick example

In [2]:
## Load the dataset
DATASET_PATH = os.path.join('..', 'data', 'tf_data7dict')
dataset = tf.data.Dataset.load(DATASET_PATH).batch(conf.BATCH_SIZE).cache().shuffle(conf.SHUFFLE_SIZE).prefetch(conf.PREFETCH_SIZE)

## Take the first batch of the dataset and trim its sequence length to 2047
X, y = next(dataset.take(1).as_numpy_iterator())
X = (X[0][:,:conf.SEQ_LEN-1,:], X[1])
y = {k: y[k][:,:conf.SEQ_LEN-1] for k in y}

print(X[0].shape, X[1].shape)
print(y.keys())

(4, 2047, 11) (4, 3)
dict_keys(['duration', 'type', 'tempo', 'measure', 'instrument', 'pitch', 'position', 'beat', 'time_sign', 'velocity', 'key_sign'])


2023-04-21 14:21:30.420590: W tensorflow/core/kernels/data/cache_dataset_ops.cc:856] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [3]:
# Function to choose a style for the song
def encode_styles(styles_array):
    return np.stack([
        utils.one_hot_encode_labels_nmf(style)
        for style in styles_array
    ], axis=0).astype(np.int8)

# Example:
styles = encode_styles(['folk', 'nes', 'maestro', 'nes'])
print(styles)

[[1 0 0]
 [0 1 0]
 [0 0 1]
 [0 1 0]]


In [4]:
preprocessing_model = keras.Model(inputs=model.input, outputs=model.get_layer('final_encoding').output)
transformer = model.get_layer('tfgpt2_model')
output_layers = [
    model.get_layer('type_scores'),
    model.get_layer('measure_scores'),
    model.get_layer('beat_scores'),
    model.get_layer('position_scores'),
    model.get_layer('duration_scores'),
    model.get_layer('pitch_scores'),
    model.get_layer('instrument_scores'),
    model.get_layer('velocity_values'),
    model.get_layer('keysign_scores'),
    model.get_layer('timesign_scores'),
    model.get_layer('tempo_scores')
]
activations = [tf.keras.activations.softmax] * 7 + [tf.keras.activations.relu] + [tf.keras.activations.softmax] * 3

## Test this pipeline
# Let's assume we need to only send the first 2 token.
# Therefore we will build an attention mask where only the first 2 values are 1 and the others are 0
CUR_IDX = 2
GEN_BATCH_SIZE = 4
attention_mask = np.ones((GEN_BATCH_SIZE, 1+CUR_IDX), dtype=np.int8)
padding_attention_mask = np.zeros((GEN_BATCH_SIZE, conf.SEQ_LEN-1-CUR_IDX), dtype=np.int8)
attention_mask = np.concatenate([attention_mask, padding_attention_mask], axis=-1)

preprocessed_tensors = preprocessing_model(X)
out_transformer      = transformer({'inputs_embeds': preprocessed_tensors},
                                   attention_mask=attention_mask)['last_hidden_state']
out_scores           = [output_layers[i](out_transformer)[:,:-1,:] 
                        for i in range(len(output_layers))]
out_probs            = [np.array(activations[i](out_scores[i])) 
                        for i in range(len(activations))]

2023-04-21 14:21:31.094699: I tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:637] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


In [5]:
# Create the token components
generation_mode = 'top_p_sampling'

if generation_mode == 'top_p_sampling':
    # Compute the dynamic top-p thresholds
    top_p_sequence = np.linspace(0.9, 0.6, conf.SEQ_LEN-1)  # Generate max_length-1 evenly spaced values

token_components = []
for head_idx in range(len(out_probs)):
    # Velocity is just a number, not a probability distribution
    if head_idx == 7: token_components.append(
        np.asarray(out_probs[7][:, CUR_IDX, :], dtype=np.int32))
    else:
    # Sample the next token from the output probabilities using the desired sampling mode

        ## 1. Top-k sampling: sample from the top-k most probable tokens.
        ## k is expressed as a ratio of the number of possibilities, because each component has different ranges.
        if generation_mode == 'top_k_sampling':
            k = int(out_probs[head_idx].shape[-1] * 0.3)
            top_k_indices = np.argsort(-np.asarray(out_probs[head_idx][:, CUR_IDX, :]))[:, :k]
            for song in range(GEN_BATCH_SIZE):
                redistrib_mass = sum(np.take(out_probs[head_idx][song, CUR_IDX, :], top_k_indices[song]))
                # redistrib_mass : 1 = prob[i] : x --> x = prob[i] / redistrib_mass
                for idx in range(out_probs[head_idx].shape[-1]):
                    out_probs[head_idx][song, CUR_IDX, idx] = \
                        (out_probs[head_idx][song, CUR_IDX, idx] / redistrib_mass) if idx in top_k_indices[song] else 0

        ## 2. Top-p sampling: sample from the most probable tokens until the cumulative probability exceeds p
        elif generation_mode == 'top_p_sampling':
            best_indices = np.argsort(-np.asarray(out_probs[head_idx][:, CUR_IDX, :]))
            for song in range(GEN_BATCH_SIZE):
                # Take elements until the cumulative probability exceeds the threshold (always at least one element)
                cum_prob = 0; k = 0
                while cum_prob < top_p_sequence[CUR_IDX]:
                    cum_prob += out_probs[head_idx][song, CUR_IDX, best_indices[song, k]]
                    k += 1
                # Directly modify the probability array
                # cum_prob : 1 = prob[i] : x --> x = prob[i] / cum_prob
                # For the other elements, the probability is 0
                for idx in range(out_probs[head_idx].shape[-1]):
                    out_probs[head_idx][song, CUR_IDX, idx] = \
                        (out_probs[head_idx][song, CUR_IDX, idx] / cum_prob) if idx in best_indices[song, :k] else 0
        
        ## 3. Standard mapping: simply sample from the probability distributions with no additional frills.
        batch = [[np.random.choice(
            np.arange(out_probs[head_idx].shape[-1]),
            p=out_probs[head_idx][song, CUR_IDX, :]
            )] for song in range(GEN_BATCH_SIZE)]
        token_components.append(np.asarray(batch, dtype=np.int32))

# Create the tokens concatenating the sampled components
new_tokens = np.concatenate(token_components, axis=1)
new_tokens

array([[  4, 112,  56, 117,  52,  23,  38,   1,  20, 126,  15],
       [  4, 118,  70,  64, 101,  18,  38,   1,   7, 130,  42],
       [  6,   3,  10,  78, 118, 187,   9,   1,  15,  21,  47],
       [  4,  63,  96, 105,  36,  23,   7,   1,  20,  46,   7]],
      dtype=int32)

---

## Generation function

In [6]:
## Song generation function
def generate_songs(model, style_list:List[str], max_length:int=conf.SEQ_LEN-1, 
                   terminator_type:int=7, generation_mode:str='standard_sampling', 
                   temperature:float=1.0, top_k_ratio=0.3, top_p_start:float=0.9, 
                   top_p_min=0.9):
    
    # Check the validity of the parameters
    assert temperature > 0, "Temperature must be greater than 0"
    assert max_length <= conf.SEQ_LEN-1, f"The maximum length of the generated song must be less than {conf.SEQ_LEN-1}"
    assert generation_mode in ['standard_sampling', 'top_k_sampling', 'top_p_sampling'], \
        f"Parameter 'generation_mode' must be one of the following: 'standard_sampling', 'top_k_sampling', 'top_p_sampling'"

    # Collect explicitly the number of songs to generate
    num_songs = len(style_list)

    # Separate preprocessing model, transformer and output layers in order to be able to
    # inject the attention masks into the transformer and still use the loaded weights
    preprocessing_model = keras.Model(inputs=model.input, outputs=model.get_layer('final_encoding').output)
    transformer = model.get_layer('tfgpt2_model')
    output_layers = [
        model.get_layer('type_scores'), model.get_layer('measure_scores'), model.get_layer('beat_scores'),
        model.get_layer('position_scores'), model.get_layer('duration_scores'), model.get_layer('pitch_scores'),
        model.get_layer('instrument_scores'), model.get_layer('velocity_values'), model.get_layer('keysign_scores'),
        model.get_layer('timesign_scores'), model.get_layer('tempo_scores')
    ]
    activations = [tf.keras.activations.softmax] * 7 + [tf.keras.activations.relu] + [tf.keras.activations.softmax] * 3
    
    # Create the empty song array.
    # The first token of the songs is always the start token (0,...,0)
    # The rest of the tensor is filled with padding of zeroes, which will be masked by the attention masks
    generated_songs = np.zeros((num_songs, conf.SEQ_LEN-1, 11), dtype=np.int32)

    if generation_mode == 'top_p_sampling':
        # Compute the dynamic top-p thresholds
        top_p_sequence = np.linspace(top_p_start, top_p_min, max_length-1)  # Generate max_length-1 evenly spaced values
    
    # Generate the one-hot encodings of the genres
    styles = encode_styles(style_list)

    # Start to generate the songs token by token
    for i in trange(1, max_length):

        # Create the attention mask (the first 2 tokens are always attended: genre and starting token)
        attention_mask = np.ones((num_songs, 1+i), dtype=np.int8)
        padding_attention_mask = np.zeros((num_songs, conf.SEQ_LEN-1-i), dtype=np.int8)
        attention_mask = np.concatenate([attention_mask, padding_attention_mask], axis=-1)

        # Preprocess the songs and pass them through the transformer
        preprocessed_tensors = preprocessing_model((generated_songs, styles))
        out_transformer      = transformer({'inputs_embeds': preprocessed_tensors},
                               attention_mask=attention_mask)['last_hidden_state']
        # Use the output layers to generate the probabilities for the next token
        # Output from transformer has SEQ_LEN tokens, so we trim it by removing the last one,
        # since it's the probability of a token that's out of our bounds.
        out_scores           = [output_layers[i](out_transformer)[:,:-1,:]
                                for i in range(len(output_layers))]
        # Apply temperature to the scores but mind the velocity scores which is a scalar
        out_scores_tempered  = [out_scores[i] / temperature for i in range(7)] + \
            [out_scores[7]]  + [out_scores[i] / temperature for i in range(8, 11)]
        out_probs            = [np.array(activations[i](out_scores_tempered[i]))
                                for i in range(len(activations))]
        
        # Create the token components
        token_components = []
        for head_idx in range(len(out_probs)):
            # Velocity is just a number, not a probability distribution
            if head_idx == 7: token_components.append(
                    np.asarray(out_probs[7][:, i, :], dtype=np.int32))
            else:
                # Sample the next token from the output probabilities using the desired sampling mode
                if generation_mode != 'standard_sampling':
                    # Use some method to modify the probabilities, such as top-k or top-p sampling

                    ## 1. Top-k sampling: sample from the top-k most probable tokens.
                    ## k is expressed as a ratio of the number of possibilities, because each component has different ranges.
                    if generation_mode == 'top_k_sampling':
                        # Compute the top-k indices in the probability array
                        k = np.ceil(out_probs[head_idx].shape[-1] * top_k_ratio)
                        top_k_indices = np.argsort(-np.asarray(out_probs[head_idx][:, i, :]))[:, :k]
                        for song in range(num_songs):
                            # For each song, compute the mass of probability to be redistributed between the top-k elements
                            redistrib_mass = sum(np.take(out_probs[head_idx][song, i, :], top_k_indices[song]))
                            # Directly modify the probability array 
                            # redistrib_mass : 1 = prob[i] : x --> x = prob[i] / redistrib_mass
                            # For the non-top-k elements, the probability is 0
                            for idx in range(out_probs[head_idx].shape[-1]):
                                out_probs[head_idx][song, i, idx] = \
                                    (out_probs[head_idx][song, i, idx] / redistrib_mass) if idx in top_k_indices[song] else 0

                    ## 2. Top-p sampling: sample from the most probable tokens until the cumulative probability exceeds p
                    elif generation_mode == 'top_p_sampling':
                        best_indices = np.argsort(-np.asarray(out_probs[head_idx][:, i, :]))
                        for song in range(num_songs):
                            # Take elements until the cumulative probability exceeds the threshold (always at least one element)
                            cum_prob = 0; k = 0
                            while cum_prob < top_p_sequence[i-1]:
                                cum_prob += out_probs[head_idx][song, i, best_indices[song, k]]
                                k += 1
                            # Directly modify the probability array
                            # cum_prob : 1 = prob[i] : x --> x = prob[i] / cum_prob
                            # For the other elements, the probability is 0
                            for idx in range(out_probs[head_idx].shape[-1]):
                                out_probs[head_idx][song, i, idx] = \
                                    (out_probs[head_idx][song, i, idx] / cum_prob) if idx in best_indices[song, :k] else 0
                
                ## Use standard mapping to sample from the (potentially modified) probability distributions
                batch = [[np.random.choice(
                    np.arange(out_probs[head_idx].shape[-1]),
                    p=out_probs[head_idx][song, i, :]
                    )] for song in range(num_songs)]
                token_components.append(np.asarray(batch, dtype=np.int32))
                
        # Create the tokens concatenating the sampled components
        new_tokens = np.concatenate(token_components, axis=1)
        # Add the new tokens to the songs
        generated_songs[:, i] = new_tokens

    # If a token in a song has type of terminator, simply overwrite the rest of the song with end tokens
    for song in range(num_songs):
        terminator_indices = np.argwhere(generated_songs[song, :, 0] == terminator_type)
        if len(terminator_indices) > 0:
            first_terminator_index = terminator_indices[0,0]
            generated_songs[song, first_terminator_index:, :] = [7] + [0]*10
        
    # Internally, the generated song is always SEQ_LEN - 1 long, so we cut it before returning it.
    return generated_songs[:, :max_length, :]

In [7]:
out_songs = generate_songs(model, ['folk', 'nes'], max_length=128, temperature=0.9, generation_mode='top_p_sampling', top_p_start=0.9, top_p_min=0.6)

100%|██████████| 127/127 [00:11<00:00, 10.90it/s]


In [8]:
out_songs.shape, out_songs

((2, 128, 11),
 array([[[  0,   0,   0, ...,   0,   0,   0],
         [  4, 118,  22, ...,  20,  57,  47],
         [  4,  85,  98, ...,  20, 122,  47],
         ...,
         [  7,   0,   0, ...,   0,   0,   0],
         [  7,   0,   0, ...,   0,   0,   0],
         [  7,   0,   0, ...,   0,   0,   0]],
 
        [[  0,   0,   0, ...,   0,   0,   0],
         [  6,   3,  87, ...,  10, 120,  39],
         [  4,  79, 125, ...,   6,  44,  47],
         ...,
         [  7,   0,   0, ...,   0,   0,   0],
         [  7,   0,   0, ...,   0,   0,   0],
         [  7,   0,   0, ...,   0,   0,   0]]], dtype=int32))