In [62]:
import numpy as np
import tensorflow as tf
import pandas as pd
import re
import string
import os
import time

from unidecode import unidecode
from dataclasses import dataclass

from modules.char_processors import CharProcessor, OneStep
from modules.models import CharModel, MyCallback

### Text Preprocessing

In [63]:
# Read file
file = r'./data/drake_data.csv'

# read as a csv with pandas
# the structure of the data makes pandas the best library to read, since it contains columns
data = pd.read_csv(file, sep = ",")

display(data.head(3))

Unnamed: 0,album,lyrics_title,lyrics_url,lyrics,track_views
0,Certified Lover Boy,Certified Lover Boy* Lyrics,https://genius.com/Drake-certified-lover-boy-l...,[Verse]\nPut my feelings on ice\nAlways been a...,8.7K
1,Certified Lover Boy,Like I’m Supposed To/Do Things Lyrics,https://genius.com/Drake-like-im-supposed-to-d...,[Verse]\nHands are tied\nSomeone's in my ear f...,38.8K
2,Certified Lover Boy,Not Around Lyrics,https://genius.com/Drake-not-around-lyrics,"[Intro]\nYeah, we back\nWassup ladies?\nSwisha...",129.8K


In [64]:
# dropnas in the lyrics, not interested in those
data = data.dropna(subset = ['lyrics'], axis = 0)

In [65]:
# get the song lyrics into a list
drake_songs = data['lyrics'].to_list()

In [66]:
# function to clean text
def clean_text(song: str, line_breaks_replacement: str = ' '):
    '''
    Cleans a specific drake song

    Args:
        song (str): text with the lyrics of a specific song
        line_breaks_replacement (str): character to use to replace line breaks

    Returns:
        drake_verses_joined (str): text with no line breaks and verses only sang by drake
    '''

    # remove Unicode characters
    normalized_song = unidecode(song)

    # remove line breaks
    song_list = normalized_song.split('\n')

    # new verses
    drake_verses = []
    
    # set default drake to be true
    drake = True

    # write a loop to iterate and return only the verses sung by drake
    for verse in song_list:

        if len(verse) == 0:
            continue

        # identify if its a hear by the squared brakers
        if '[' in verse:

            # update the verse_head value
            verse_head = verse

            def drake_sung(verse):
                '''Method to identify if drake is singing the verse'''

                # find a :
                match = verse.find(':')

                # when the artist is not specified its drake
                if match == -1:
                    return True

                # else we need to check if it will be a pure drake verse
                else:
                    
                    # get the list of singers
                    singers = verse[match+2:].replace(']', '').split(' ')

                    # get if drake is the only singer
                    if ('Drake' in singers) & (len(singers) == 1):
                        return True
                    else:
                        return False

            # bool if sung by drake
            drake = drake_sung(verse_head)

        if (drake) & ('[' not in verse):

            # remove punctuation from the verse
            clean_verse = ''.join([x.lower() for x in verse if x not in string.punctuation])

            # append to the list
            drake_verses.append(clean_verse)

    # join all of drake verses
    drake_verses_joined = ' '.join(drake_verses)

    # return the new list
    return drake_verses_joined

In [67]:
only_drake = ' '.join([clean_text(song) for song in drake_songs])

In [68]:
# save as a json for later
file = open('./data/drake_songs.txt', 'w')
file.write(only_drake)
file.close()

### Character vectorization

In [69]:
# instantiante char processor class
char_processor = CharProcessor(text=only_drake)

37 unique characters


In [70]:
# create example text and use char processor to convert to ids and convert back
example_verse = only_drake[:50]
print(example_verse)
verse_chars = char_processor.ragged_tensor(example_verse)
verse_chars

put my feelings on ice always been a gem certified


<tf.Tensor: shape=(50,), dtype=string, numpy=
array([b'p', b'u', b't', b' ', b'm', b'y', b' ', b'f', b'e', b'e', b'l',
       b'i', b'n', b'g', b's', b' ', b'o', b'n', b' ', b'i', b'c', b'e',
       b' ', b'a', b'l', b'w', b'a', b'y', b's', b' ', b'b', b'e', b'e',
       b'n', b' ', b'a', b' ', b'g', b'e', b'm', b' ', b'c', b'e', b'r',
       b't', b'i', b'f', b'i', b'e', b'd'], dtype=object)>

In [71]:
# get the ids from the verse (FEED TEXT - METHOD ALREADY CREATES RAGGED TENSOR)
ids_from_verse = char_processor.get_ids_from_text(example_verse)
ids_from_verse

<tf.Tensor: shape=(50,), dtype=int64, numpy=
array([27, 32, 31,  1, 24, 36,  1, 17, 16, 16, 23, 20, 25, 18, 30,  1, 26,
       25,  1, 20, 14, 16,  1, 12, 23, 34, 12, 36, 30,  1, 13, 16, 16, 25,
        1, 12,  1, 18, 16, 24,  1, 14, 16, 29, 31, 20, 17, 20, 16, 15],
      dtype=int64)>

In [72]:
# convert back to text
chars_from_ids = char_processor.text_from_ids(ids_from_verse)
chars_from_ids

<tf.Tensor: shape=(), dtype=string, numpy=b'put my feelings on ice always been a gem certified'>

As we see above, the methods from CharProcessor class are able to process a verse, convert it to tokens and convert it back to text

We will now try to create a dataset using the class with the create_dataset method

In [73]:
# Batch size
BATCH_SIZE = 40

# Buffer size to shuffle the dataset
BUFFER_SIZE = 10000

# Specify sequence length
SEQUENCE = 100

# Dataset path
PATH = './data/character_dataset'

In [74]:
# create dataset, specifying the path to save, batch size and buffer size
dataset = char_processor.create_dataset(text=only_drake, pathsave=PATH, save=True, \
    sequence=SEQUENCE, batch_size=BATCH_SIZE, buffer_size=BUFFER_SIZE)

dataset


<PrefetchDataset element_spec=(TensorSpec(shape=(40, 100), dtype=tf.int64, name=None), TensorSpec(shape=(40, 100), dtype=tf.int64, name=None))>

### Create Model

In [75]:
from modules.models import CharModel

In [76]:
# define parameters
VOCAB_SIZE = len(char_processor.ids_from_chars.get_vocabulary())
EMBEDDING_DIM = 100
LSTM_UNITS = 300

In [77]:
model = CharModel(
    vocab_size=VOCAB_SIZE,
    embedding_dim=EMBEDDING_DIM,
    rnn_units=LSTM_UNITS)

### Try out the model

If we try out the model unfitted, we will see that it will not be able to return any proper verse

In [78]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(40, 100, 38) # (batch_size, sequence_length, vocab_size)


In [79]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

In [80]:
print("Input:\n", char_processor.text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", char_processor.text_from_ids(sampled_indices).numpy())

Input:
 b'fore i make my decisions thats how i take the high road say i never get alone time thats a lie thoug'

Next Char Predictions:
 b'7uplzefgnzg4faaxwbrvoy0wety2jcp6nyhj83f5vrluwsy0xn3kqgtf0ct2fht7a9fa5zmoihz136qtrn6uumvmjpowk41c5g3c'


### Check model exponential loss

In [81]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [82]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (40, 100, 38)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(3.6375577, shape=(), dtype=float32)


In [83]:
print(f'Exponential loss: {tf.exp(example_batch_mean_loss).numpy()}')
print(f'Vocab Size: {VOCAB_SIZE}')

Exponential loss: 37.99892044067383
Vocab Size: 38


By checking the exponential of the mean loss of the untrained model, we can check whether the model has been properly initialized or not. 

If the exponential loss is similar to the vocab size, that means that the model has been properly initialzed (it generated characters randomly from the character pool available)

### Model Training

In [84]:
# compile with adam and sparse categorical crossentropy loss
model.compile(optimizer='adam', loss=loss)

In [85]:
model.summary()

Model: "char_model_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_3 (Embedding)     multiple                  3800      
                                                                 
 lstm_6 (LSTM)               multiple                  481200    
                                                                 
 lstm_7 (LSTM)               multiple                  721200    
                                                                 
 dense_6 (Dense)             multiple                  308224    
                                                                 
 dense_7 (Dense)             multiple                  38950     
                                                                 
Total params: 1,553,374
Trainable params: 1,553,374
Non-trainable params: 0
_________________________________________________________________


In [86]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

callback_loss = MyCallback()

In [87]:
EPOCHS = 40

In [88]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[callback_loss, checkpoint_callback])

Epoch 1/40
Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40
Epoch 11/40
Epoch 12/40
Epoch 13/40
Epoch 14/40
Epoch 15/40
Epoch 16/40
Epoch 17/40
Epoch 18/40
Epoch 19/40
Epoch 20/40
Epoch 21/40
Epoch 22/40
Epoch 23/40
Epoch 24/40
Epoch 25/40
Epoch 26/40
Epoch 27/40
Epoch 28/40
Epoch 29/40
Epoch 30/40
Epoch 31/40
Epoch 32/40
Epoch 33/40
Epoch 34/40
Epoch 35/40
Epoch 36/40
Epoch 37/40
Epoch 38/40
Epoch 39/40
Epoch 40/40


In [89]:
model.save('./models/drake_song_generator')



INFO:tensorflow:Assets written to: ./models/drake_song_generator\assets


INFO:tensorflow:Assets written to: ./models/drake_song_generator\assets


### Song Generation

In [90]:
# save them as separate variables for later
ids_from_chars = char_processor.ids_from_chars
chars_from_ids = char_processor.chars_from_ids

In [91]:
class OneStep(tf.keras.Model):


    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create a mask to prevent "[UNK]" from being generated.
        skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
        sparse_mask = tf.SparseTensor(
            # Put a -inf at each bad index.
            values=[-float('inf')]*len(skip_ids),
            indices=skip_ids,
            # Match the shape to the vocabulary
            dense_shape=[len(self.ids_from_chars.get_vocabulary())])
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)


    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs.
        input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Run the model.
        # predicted_logits.shape is [batch, char, next_char_logits]
        predicted_logits, states = self.model(inputs=input_ids, states=states,
                                            return_state=True)
        # Only use the last prediction.
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits/self.temperature
        # Apply the prediction mask: prevent "[UNK]" from being generated.
        predicted_logits = predicted_logits + self.prediction_mask

        # Sample the output logits to generate token IDs.
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to characters
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return the characters and model state.
        return predicted_chars, states

In [92]:
# create one step model
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [95]:
start = time.time()
states = None
verse='I tried to be nice to you, but you pushed me away'
next_char = tf.constant(verse, shape=(BATCH_SIZE))
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

I tried to be nice to you, but you pushed me away that i real why you heard or who is it imagine and im convinced i made sacrifices ive been starin at the mosto drug it right now thats me on fuck it i understand im not alone up the whole city stucked it right now that i didnt just keep it right man i play seem like im said i seen fuck the deales yeah they aint swimmen to a wife dj you think were home now cause me one my face with me when i was on a compondo eat and we takin it all bottles in houstatlantavegas ayy houstatlantavegas ayy houstatlantavegas ayy houstatlantavegas ayy houstatlanta hand ima happen that still quick wes always was a supportin man what make it friends take you the best with everybody of right thrd much on my nw flexh and im the intriggas ask hes chas to calm the things that i am over im in that bitch aint lovers again party and baby waight a floot for my last for no falls tell me hill she wanna hin whay up was good im winning to chuck with your chick wanna pol we