In [None]:
![Lyrics Genius Logo](LyricsGenius.png)

Notebook used to train Lyrics Genius given a lyrics dataset and the network specification

In [7]:
from __future__ import print_function
# Data manipulation
import pydot
import numpy as np
import pandas as pd

# Misc libraries
import json
import pickle
import sys
import io


# Deep Learning libraries
from keras.callbacks import LambdaCallback
from keras.models import Model, load_model, Sequential
from keras.layers import Dense, Activation, Dropout, Input, Masking, Embedding, InputLayer
from keras.layers import LSTM, Lambda, concatenate, Bidirectional, Concatenate, SpatialDropout1D
from keras.utils.vis_utils import plot_model
import keras
from keras.layers.merge import add
from keras.utils.data_utils import get_file
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.layers import Input, Embedding, Activation, Flatten, Dense
from keras.layers import Conv1D, MaxPooling1D, Dropout

# Loading Training Data

In [8]:
print("Loading text data...")
text = io.open('data/rhcp-lyrics.txt', encoding='utf-8').read().lower()
print('corpus length:', len(text))


Loading text data...
corpus length: 417951


In [9]:
Tx = 40
chars = sorted(list(set(text)))
num_classes = len(chars)
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))
print('number of unique characters in the corpus:', len(chars))


number of unique characters in the corpus: 54


In [10]:
def build_data(text, Tx = 40, stride = 3):
    """
    Create a training set by scanning a window of size Tx over the text corpus, with stride 3.
    
    Arguments:
    text -- string, corpus of Shakespearian poem
    Tx -- sequence length, number of time-steps (or characters) in one training example
    stride -- how much the window shifts itself while scanning
    
    Returns:
    X -- list of training examples
    Y -- list of training labels
    """
    
    X = []
    Y = []

    for i in range(0, len(text) - Tx, stride):
        X.append(text[i: i + Tx])
        Y.append(text[i + Tx])

    
    print('number of training examples:', len(X))
    
    return X, Y

# Create Training Set and Vectorize Data

In [11]:
print("Creating training set...")
X, Y = build_data(text, Tx=Tx, stride = 3)


Creating training set...
number of training examples: 139304


In [15]:
tk = Tokenizer(num_words=None, char_level=True, oov_token='UNK')
tk.fit_on_texts(X)
# If we already have a character list, then replace the tk.word_index
# If not, just skip below part


# construct a new vocabulary
alphabet = chars


#Store alphabet to make predictions
with open('models/rhcp-alphabet.json', 'w+') as fp:
    json.dump(alphabet, fp)
    
char_dict = {}
for i, char in enumerate(alphabet):
    char_dict[char] = i + 1

# Use char_dict to replace the tk.word_index
tk.word_index = char_dict.copy()
# Add 'UNK' to the vocabulary
tk.word_index[tk.oov_token] = max(char_dict.values()) + 1


 
with open('models/rhcp-tokenizer.pkl', 'wb') as handle:
    pickle.dump(tk, handle, protocol=pickle.HIGHEST_PROTOCOL)

# Convert string to index
train_sequences = tk.texts_to_sequences(X)

# Padding
train_data = pad_sequences(train_sequences, maxlen=Tx, padding='post')

# Convert to numpy array
train_data = np.array(train_data, dtype='float32')

# =======================Get classes================
train_classes = [elem[0] for elem in tk.texts_to_sequences(Y)]

train_class_list = [x - 1 for x in train_classes]



from keras.utils import to_categorical

train_classes = to_categorical(train_class_list)


In [16]:
x, y = train_data, train_classes

# Building the Model

In [17]:
model_config = {
    'rnn_width': 64,
    'rnn_depth': 4,
    'rnn_dropout': 0.3,
    'bidirectional': True
}
embedding_size = 128


In [19]:
continue_learning = False
model_path = "models/rhcp_model_res.h5"

In [20]:
def new_lstm_cell(rnn_width, rnn_dropout, bidirectional=True, return_sequences=False):
    if bidirectional:
        return Bidirectional(LSTM(rnn_width, recurrent_dropout=rnn_dropout, dropout=rnn_dropout,return_sequences=return_sequences))
    else:
        return LSTM(rnn_width, recurrent_dropout=rnn_dropout, dropout=rnn_dropout,return_sequences=return_sequences)

In [21]:
def make_lstm_layers(input, rnn_width, rnn_depth, rnn_dropout, bidirectional=True):
    layer_list = []
    layer = input
    for i in range(rnn_depth):
        return_sequences = i < rnn_depth - 1
        prev_layer = input if i == 0 else layer_list[-1]
        layer = new_lstm_cell(rnn_width, rnn_dropout, bidirectional=bidirectional, return_sequences=return_sequences)
        
        layer_list.append(layer)
    return layer, layer_list
    

In [22]:
def make_residual_lstm_layers(input, rnn_width, rnn_depth, rnn_dropout, bidirectional=True):
    """
    The intermediate LSTM layers return sequences, while the last returns a single element.
    The input is also a sequence. In order to match the shape of input and output of the LSTM
    to sum them we can do it only for all layers but the last.
    """
    x = input
    layer_list = []
    for i in range(rnn_depth):
        return_sequences = i < rnn_depth - 1
        x_rnn = Bidirectional(LSTM(rnn_width, recurrent_dropout=rnn_dropout, dropout=rnn_dropout, return_sequences=return_sequences))(x)
        if return_sequences:
            # Intermediate layers return sequences, input is also a sequence.
            if i > 0 or input.shape[-1] == rnn_width:
                x = add([x, x_rnn])
            else:
                # Note that the input size and RNN output has to match, due to the sum operation.
                # If we want different rnn_width, we'd have to perform the sum from layer 2 on.
                x = x_rnn
        else:
            # Last layer does not return sequences, just the last element
            # so we select only the last element of the previous output.
            def slice_last(x):
                return x[..., -1, :]
            x = add([Lambda(slice_last)(x), x_rnn])
        layer_list.append(x_rnn)
    return x, layer_list

In [23]:
def create_model_residual(model_config):
    inputs = Input(shape=(Tx, ), name='sent_input', dtype='int64')
    
    embeddings = keras.layers.Embedding(len(chars) + 1, embedding_size, input_length=Tx)(inputs)
    embeddings = SpatialDropout1D(model_config['rnn_dropout'], name='spatial-dropout')(embeddings)
    lstm_layer, layer_list = make_residual_lstm_layers(embeddings, **model_config)
    
    dense_layer = keras.layers.Dense(len(chars), activation='softmax')(lstm_layer)
    model = keras.Model(inputs=inputs, outputs=dense_layer)
    optimizer = keras.optimizers.Adam(learning_rate=4e-3)
    model.compile( loss='categorical_crossentropy', optimizer=optimizer)
    return model

In [24]:
# Simple Deep LSTM Model without Residual Units
def create_model():
    model = Sequential()
    model.add(InputLayer(input_shape=(Tx, len(chars))))
    model.add(LSTM(128, input_shape=(Tx, len(chars)), return_sequences=True))
    model.add(Dropout(0.5))
    model.add(LSTM(128))
    model.add(Dropout(0.5))
    model.add(Dense(len(chars), activation='softmax'))

    optimizer = keras.optimizers.Adam(learning_rate=4e-3)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer)
    return model

In [25]:
model = None
if continue_learning:
    model = load_model(model_path)
else:
    model = create_model_residual(model_config)

In [26]:
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
sent_input (InputLayer)         (None, 40)           0                                            
__________________________________________________________________________________________________
embedding_1 (Embedding)         (None, 40, 128)      7040        sent_input[0][0]                 
__________________________________________________________________________________________________
spatial-dropout (SpatialDropout (None, 40, 128)      0           embedding_1[0][0]                
__________________________________________________________________________________________________
bidirectional_1 (Bidirectional) (None, 40, 128)      98816       spatial-dropout[0][0]            
____________________________________________________________________________________________

In [27]:
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    out = np.random.choice(range(len(chars)), p = probas.ravel())
    return out

In [None]:
history = model.fit(x, y, batch_size=128, epochs=30, verbose=True)

  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "


Epoch 1/30
Epoch 2/30
Epoch 3/30

In [None]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')

def plot_history(history):

    loss = history.history['loss']
    x = range(1, len(loss) + 1)


    plt.plot(x, loss, 'b', label='Training loss')
    plt.title('Training loss')
    plt.legend()

In [None]:
plot_history(history)

In [None]:
# serialize weights to HDF5
model.save("models/rhcp_model_res.h5", overwrite=True)
print("Saved model to disk")

In [None]:
def generate_output(temperature=1.0):
    generated = ''
    usr_input = input("Start typing the beginning of your lyrics. Lyric-genius will complete it.\n Your input is: ")
    # zero pad the sentence to Tx characters.
    sentence = ('{0:0>' + str(Tx) + '}').format(usr_input).lower()
    generated += usr_input 

    sys.stdout.write("\n\nHere is your lyric: \n\n") 
    sys.stdout.write(usr_input)
    for i in range(300):

        predict_sequence = tk.texts_to_sequences([sentence])

        # Padding
        predict_data = pad_sequences(predict_sequence, maxlen=Tx, padding='post')

        # Convert to numpy array
        x_pred = np.array(predict_data, dtype='float32')

        preds = model.predict(x_pred, verbose=0)[0]
        next_index = sample(preds, temperature = temperature)
        next_char = indices_char[next_index]

        generated += next_char
        sentence = sentence[1:] + next_char

        sys.stdout.write(next_char)
        sys.stdout.flush()

        if next_char == '\n':
            continue

In [None]:
generate_output(temperature=0.5)