# 0. Importations 

In [270]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [271]:
import os
print(f"Chemin vers le dossier actuel: \n {os.getcwd()}")
path_main_folder = '/content/drive/My Drive/lyrics_generation/data/'

os.chdir(path_main_folder)
print(f"Chemin vers le dossier actuel: \n {os.getcwd()}")

Chemin vers le dossier actuel: 
 /content/drive/My Drive/lyrics_generation/data
Chemin vers le dossier actuel: 
 /content/drive/My Drive/lyrics_generation/data


In [0]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as data
from torch.utils.data import DataLoader, TensorDataset

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
import re
import torch.nn.functional as F

from argparse import Namespace
import random
import string
import io
import sys, os

Define device

In [0]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 1. Text pre-processing 

In [0]:
def preprocess(text):
    text = text.replace("'ll",' will')
    text = text.replace("'m",' am')
    text = text.replace("'re",' are')
    text = text.replace("'s",' is')
    text = text.replace("\n"," \n ")
    text = text.lower()
    text = re.sub(' {2,}', ' ', text)
    #text = "".join(v for v in text if v not in string.punctuation).lower()
    #text = text.replace("\n"," \SAUT ")
    return text

def extract_characters(text):
    return sorted(list(set(text)))

def get_chars_index_dicts(chars):
    return dict((c, i) for i, c in enumerate(chars)), dict((i, c) for i, c in enumerate(chars))

# 2. Vectorization

In [0]:
#Encode characters in numbers
def vectorize(sentence, char_to_index):
  return np.array([char_to_index[char] for char in sentence])

#Project a batch of int_sequences in dim chars space
def one_hot_encode(sequences, chars):
    X = np.zeros((len(sequences), len(sequences[0]), len(chars)), dtype=int)
    for i, sentence in enumerate(sequences):
        for t, integer in enumerate(sentence):
            X[i, t, integer] = 1
    return X

# def one_hot_encode(sequences, sequence_length, chars, char_to_index, next_chars):
#     X = np.zeros((len(sequences), sequence_length, len(chars)), dtype=int)
#     y = np.zeros((len(sequences), len(chars)), dtype=int)
#     for i, sentence in enumerate(sequences):
#         for t, char in enumerate(sentence):
#             X[i, t, char_to_index[char]] = 1
#         y[i, char_to_index[next_chars[i]]] = 1

#     return X, y

In [0]:
def show_batch(train_loader):
    # obtain one batch of training data
    dataiter = iter(train_loader)
    sample_x, sample_y = dataiter.next()
    print('Sample input size: ', sample_x.size()) # batch_size, seq_length
    print('Sample input: \n', sample_x)
    print()
    print('Sample output size: ', sample_y.size()) # batch_size
    print('Sample output: \n', sample_y)

# Data extraction

In [0]:
def get_data_from_file(train_file,artist_name=None):
    
    df = pd.read_csv(train_file)
    
    #Select artist
    if artist_name :
      df = df[df.artist==artist_name]
    
    #Get text
    text = ' '.join([x for x in df.text])

    #Transform data
    text = preprocess(text)
    
    #Get characters
    chars = extract_characters(text)

    #Get dictionnaries
    char_to_index, indices_char = get_chars_index_dicts(chars)

    #Encode text
    encoded_text = vectorize(text, char_to_index)

    return chars, encoded_text

# Create Batches

In [0]:
def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       
       Arguments
       ---------
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    '''
    
    batch_size_total = batch_size * seq_length
    # total number of batches we can make
    n_batches = len(arr)//batch_size_total
    
    # Keep only enough characters to make full batches
    arr = arr[:n_batches * batch_size_total]
    # Reshape into batch_size rows
    arr = arr.reshape((batch_size, -1))
    
    # iterate through the array, one sequence at a time
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:, n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [279]:
x, y = next(get_batches(encoded_text,10,40))
print(x.shape)
one_hot_encode(x,chars).shape

(10, 40)


(10, 40, 50)

In [0]:
# def create_sequences(text, sequence_length, step):
#     sequences = []
#     next_chars = []
#     for i in range(0, len(text) - sequence_length, step):
#         sequences.append(text[i: i + sequence_length])
#         next_chars.append(text[i + sequence_length])
#     return sequences, next_chars

# def create_sequences(text, sequence_length, step):
#     text = text[:sequence_length * (len(text)//sequence_length)+2]
#     sequences = []
#     next_sequences = []
#     for i in range(0, len(text) - sequence_length-1, step):
#         sequences.append(text[i: i + sequence_length])
#         next_sequences.append(text[i+1: i + sequence_length+1])
#     return sequences, next_sequences

# 3. Create Model class and train function

## 3. A) LSTM

In [0]:
class LSTM(nn.Module):
    def __init__(self, chars, hidden_size=128, num_layers=2, drop_prob=0.5, lr=0.001):
        super().__init__()
        self.chars = chars
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.drop_prob = drop_prob
        self.lr = lr
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}

        # Define layers
        self.lstm = nn.LSTM(input_size=len(chars), hidden_size=hidden_size, num_layers=num_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_size, len(chars))
        self.softmax = nn.Softmax(dim=1)


    def forward(self, x, hidden):
      # Hidden = h et c les états internes
      
      output, hidden = self.lstm(x, hidden)
  
      # Use dropout
      out = self.dropout(output)

      out = out.contiguous().view(-1, self.hidden_size)
      #x = x[:,-1,:]  # Keep only the output of the last iteration. Before shape (6,3,128), after shape (6,128)
      
      out = self.fc(out)
      #out = self.softmax(out)
      
      return out, hidden
    
      
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes nb_layers x batch_size x hidden_size,
        # initialized to zero, for hidden state and cell state of LSTM
        
        hidden = (torch.zeros(self.num_layers, batch_size, self.hidden_size, dtype=int).to(device),
                torch.zeros(self.num_layers, batch_size, self.hidden_size,dtype=int).to(device))
        
        return hidden

## 3. B) Train function

In [0]:
def train(model, data, batch_size=10, seq_length=40, nb_epochs=8000, lr=0.01, clip=5, val_frac=0.1, print_every=1):
  
  model = model.to(device)
  model.train()
  
  #criterion = nn.MSELoss()
  #criterion = nn.CategoricalCrossEntropy()
  criterion = nn.CrossEntropyLoss()
 
  #optimizer = torch.optim.RMSprop(model.parameters(), lr = lr)
  optimizer = torch.optim.Adam(model.parameters(), lr=lr)
  
  # Create training and validation data
  #TODO: cross-validation
  val_index = int(len(data)*(1-val_frac))
  train_data, val_data = data[:val_index], data[val_index:]

  counter = 0
  n_chars = len(model.chars)

  for epoch in range(nb_epochs):
    # initialize hidden state
    h = model.init_hidden(batch_size)

    #train_data = TensorDataset(torch.from_numpy(X).to(device), torch.from_numpy(y).to(device))
    #train_loader = DataLoader(train_data, shuffle=False, batch_size=batch_size, drop_last=True)

    for x, y in get_batches(data, batch_size, seq_length):
      
      counter+=1

      # One-hot encode our data and make them Torch tensors (only x data!!)  
      x = one_hot_encode(x, model.chars)


      inputs, targets = torch.from_numpy(x).float(), torch.from_numpy(y).float()
      
      inputs, targets = inputs.to(device), targets.to(device)
      
      # Creating new variables for the hidden state, otherwise
      # we'd backprop through the entire training history
      h = tuple([each.data.float() for each in h])
 
      model.zero_grad()

      output, h = model(inputs, h)
      
      loss = criterion(output, targets.view(batch_size*seq_length).long())
      #loss = criterion(output, targets)
      
      loss.backward()
      
      # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
      nn.utils.clip_grad_norm_(model.parameters(), clip)

      optimizer.step()

      # loss stats
      if counter % print_every == 0:
          # Get validation loss
          val_h = model.init_hidden(batch_size)
          val_losses = []
          model.eval()
          for x, y in get_batches(val_data, batch_size, seq_length):
              # One-hot encode our data and make them Torch tensors
              x = one_hot_encode(x, model.chars)
              inputs, targets = torch.from_numpy(x).float(), torch.from_numpy(y).float()
              
              # Creating new variables for the hidden state, otherwise
              # we'd backprop through the entire training history
              val_h = tuple([each.data.float() for each in val_h])
            
              inputs, targets = inputs.to(device), targets.to(device)

              output, val_h = model(inputs, val_h)
              val_loss = criterion(output, targets.view(batch_size*seq_length).long())
          
              val_losses.append(val_loss.item())
          
          model.train() # reset to train mode after iterationg through validation data
                
          print("Epoch: [{}/{}]".format(epoch+1, nb_epochs),
                "Step: [{}]".format(counter),
                "Loss: {:.4f}".format(loss.item()),
                "Val Loss: {:.4f}".format(np.mean(val_losses)))

    #print(f"Epoch {epoch+1}/{n_epochs}, loss = {loss.item()}")

# 4. Instantiate model

## 4. A) Pipeline : from text

In [0]:
flags = Namespace(
    train_file='reducedsongdata.csv',
    artist_name='Coldplay',
    batch_size=8,
    SEQUENCE_LENGTH=40,
    embedding_size=1,
    hidden_size=32,
    gradients_norm=5,
    initial_words=['i', 'am'],
    predict_top_k=5,
    checkpoint_path='checkpoint',
    n_layers=1,
    SEQUENCE_STEP = 1,
    EPOCHS10 = 10,
    DIVERSITY = 1.0
)

In [0]:
chars, encoded_text = get_data_from_file(flags.train_file)

In [0]:
model = LSTM(chars,hidden_size=256)

In [0]:
model =model.float()

In [330]:
#Train the model

batch_size = 128
seq_length = 40
n_epochs = 1000

train(model, encoded_text, batch_size=batch_size, seq_length=seq_length, nb_epochs=n_epochs, lr=0.01, print_every=20)
#train(model, X, y, batch_size=128, n_epochs=EPOCHS10)

Epoch: [1/1000] Step: [20] Loss: 1.2420 Val Loss: 1.1570
Epoch: [1/1000] Step: [40] Loss: 1.2128 Val Loss: 1.1543
Epoch: [2/1000] Step: [60] Loss: 1.2201 Val Loss: 1.1483
Epoch: [2/1000] Step: [80] Loss: 1.2579 Val Loss: 1.1431
Epoch: [2/1000] Step: [100] Loss: 1.2446 Val Loss: 1.1382
Epoch: [3/1000] Step: [120] Loss: 1.2294 Val Loss: 1.1402
Epoch: [3/1000] Step: [140] Loss: 1.2071 Val Loss: 1.1322
Epoch: [4/1000] Step: [160] Loss: 1.1704 Val Loss: 1.1385
Epoch: [4/1000] Step: [180] Loss: 1.2027 Val Loss: 1.1252
Epoch: [4/1000] Step: [200] Loss: 1.1810 Val Loss: 1.1278
Epoch: [5/1000] Step: [220] Loss: 1.2219 Val Loss: 1.1226
Epoch: [5/1000] Step: [240] Loss: 1.2147 Val Loss: 1.1157
Epoch: [6/1000] Step: [260] Loss: 1.1710 Val Loss: 1.1126
Epoch: [6/1000] Step: [280] Loss: 1.1955 Val Loss: 1.1158
Epoch: [6/1000] Step: [300] Loss: 1.2153 Val Loss: 1.1165
Epoch: [7/1000] Step: [320] Loss: 1.2062 Val Loss: 1.1158
Epoch: [7/1000] Step: [340] Loss: 1.1625 Val Loss: 1.1025
Epoch: [8/1000] St

KeyboardInterrupt: ignored

# 5. Save Model

In [0]:
model_name = 'rnn_20_epoch.net'

checkpoint = {'n_hidden': model.hidden_size,
              'n_layers': model.num_layers,
              'state_dict': model.state_dict(),
              'tokens': model.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

# 5. Generate samples

In [0]:
def predict(model, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        '''
        # tensor inputs
        x = np.array([[model.char2int[char]]])
        x = one_hot_encode(x, model.chars)
        inputs = torch.from_numpy(x).float()
        
        inputs = inputs.to(device)
        
        # detach hidden state from history
        h = tuple([each.data.float() for each in h])
        
        # get the output of the model
        out, h = model(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
        
        p = p.cpu() # move to cpu
        
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(model.chars))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        
        # return the encoded value of the predicted char and the hidden state
        return model.int2char[char], h

In [0]:
def sample(model, size, sentence='The', top_k=None):
    
    model = model.to(device)
    
    model.eval() # eval mode
    
    sentence = sentence.lower()

    chars = [ch for ch in sentence]
    
    h = model.init_hidden(1)
    
    for ch in sentence:
        char, h = predict(model, ch, h, top_k=top_k)
    
    #add last one
    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for i in range(size):
        char, h = predict(model, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

In [332]:
print(sample(model, 1000, prime='hey you', top_k=5))

hey you can't stard 
 
 somewhere through the tame 
 we see who you are 
 it is my heart 
 and if it is true 
 and we are so cald, they wanna stay, i've come down with love, 
 i've come down with love, 
 i am gonna be 
 into your wings on us 
 you will call me when i am an idiot 
 they are gonna give up tired tonight 
 i am a long bollablit still 
 i am a stuck on your wines 
 i am taking up to me 
 
 i can't she can we see why it is mederar tight, 
 and the sang shake the surface to find the clace, though 
 and all the stars all there we could be my home 
 i wanna be a chill far 
 i've been broken and singen away 
 that won't get it and see 
 and i will see you something that we are on 
 i am a some cause i can't go again 
 and you are always meant to be 
 your stally saved to be if your time 
 and if i ever cancked and see 
 and i see it is a second open open, but take all a sound 
 the time and all the world all your life 
 the same 
 
 if you are starting away 
 and you could be me

In [290]:
for diversity in [0.2, 0.5, 1.0, 1.2]:
    print()
    print('----- diversity:', diversity)

    generated = ''
    # insert your 40-chars long string. OBS it needs to be exactly 40 chars!
    sentence = "Hey my lovely dear, i missed you so much"
    sentence = sentence.lower() #stop here-------------^"
    generated += sentence

    print('----- Generating with seed: "' + sentence + '"')
    sys.stdout.write(generated)

    for i in range(400):
        x = np.zeros((1, SEQUENCE_LENGTH, len(chars)))
        for t, char in enumerate(sentence):
            x[0, t, char_to_index[char]] = 1
        x = torch.from_numpy(x)
        x = x.float()
        x = x.to(device)
        model.eval()
        #x_input = torch.tensor([70, 80, 90]).float().reshape((1, 3, 1))
        #yhat = model(x_input)
        #print(yhat)
        predictions = model(x)[0]
        predictions = predictions.cpu().detach()
        predictions = predictions.numpy()
        #next_index = sample(predictions, diversity)
        next_index = np.argmax(predictions)
        next_char = indices_char[next_index]

        generated += next_char
        sentence = sentence[1:] + next_char

        sys.stdout.write(next_char)
        sys.stdout.flush()
    print()


----- diversity: 0.2
----- Generating with seed: "hey my lovely dear, i missed you so much"
hey my lovely dear, i missed you so much

NameError: ignored

In [0]:
def sample(preds, temperature=1.0):
    preds =preds.cpu()
    preds = preds.detach()
    print(preds.shape)
    print(preds)
    if temperature == 0:
        temperature = 1

    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)