# Split the words

In [1]:
import nltk
from nltk.tokenize import word_tokenize
from collections import Counter
import numpy as np
import re

In [2]:
lyric_path = "The Weeknd.txt"
with open(lyric_path, 'r', encoding='utf-8') as f:
    lyrics = f.read()

In [3]:
# Turn all letter into lower letter and split lines by \n
lyrics_list = lyrics.lower().split("\n")
lyrics_list = np.unique(lyrics_list)[1:].tolist()
print(lyrics_list[:3])
print('\n')

# Split the sentences into words
split_lyrics_list = []
for lyric in lyrics_list:
    # There are some special cases I can't figure out how to match
    # So I've to catch them indivudually
    pattern = r'fa-la-la-la-la|fa-fallin|-fy|fa-|li-|\b[\w,.-]+\b'
    split_lyrics_list.append(re.findall(pattern, lyric))
print(split_lyrics_list[:3])

['20, get the bar rollin fake i', '20, get the bar rolling fake i', '20, keep the bar rolling fake i']


[['20', 'get', 'the', 'bar', 'rollin', 'fake', 'i'], ['20', 'get', 'the', 'bar', 'rolling', 'fake', 'i'], ['20', 'keep', 'the', 'bar', 'rolling', 'fake', 'i']]


# Tokenize all the words

In [4]:
all_lyrics_text = ' '.join(lyrics_list)
tmp_tokens = word_tokenize(all_lyrics_text)

tokens = []
# word_tokenize can't recognize abbreviation like 'gonna', it will split them into 'gon' 'na'
# So I have to merge them manualy
for i in range(len(tmp_tokens)):
    if tmp_tokens[i] == 'gon' and tmp_tokens[i + 1] == 'na':
        tokens.append('gonna')
        i += 1
        continue
    elif tmp_tokens[i] == 'w-' and tmp_tokens[i + 1] == 'wan' and tmp_tokens[i + 2] == 'na':
        tokens.append('w-wanna')
        i += 2
        continue
    elif tmp_tokens[i] == 'wan' and tmp_tokens[i + 1] == 'na':
        tokens.append('wanna')
        i += 1
        continue
    elif tmp_tokens[i] == 'got' and tmp_tokens[i + 1] == 'ta':
        tokens.append('gotta')
        i += 1
        continue
    elif tmp_tokens[i] == 'gim' and tmp_tokens[i + 1] == 'me':
        tokens.append('gimme')
        i += 1
        continue
    elif tmp_tokens[i] == 'lem' and tmp_tokens[i + 1] == 'me':
        tokens.append('lemme')
        i += 1
        continue
    else:
        tokens.append(tmp_tokens[i])
        
print(tokens[:5])

['20', ',', 'get', 'the', 'bar']


# Build word_to_idx and idx_to_word

In [5]:
word_counts = Counter(tokens)
vocab = sorted(word_counts, key=word_counts.get, reverse=True)
vocab = ["<pad>"] + vocab
vocab_size = len(vocab)
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}

print(word_to_idx)



# Build Dataset, DataLoader

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

In [7]:
seq_len = 5
train_lyrics_list = []

# Model takes five input words, and output five words out as each input word's next words prediction
for lyric in split_lyrics_list:
    if len(lyric) <= seq_len:
        train_lyrics_list.append(lyric)
        continue
    
    for i in range(seq_len, len(lyric)):
        train_lyrics_list.append(lyric[i - seq_len:i + 1])
        
train_lyrics_list[:5]

[['20', 'get', 'the', 'bar', 'rollin', 'fake'],
 ['get', 'the', 'bar', 'rollin', 'fake', 'i'],
 ['20', 'get', 'the', 'bar', 'rolling', 'fake'],
 ['get', 'the', 'bar', 'rolling', 'fake', 'i'],
 ['20', 'keep', 'the', 'bar', 'rolling', 'fake']]

In [8]:
class Lyrics_Dataset(Dataset):
    def __init__(self, data):
        self.data = data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        text = self.data[index]
        input_text = text[:-1]
        target_text = text[1:]
        
        return input_text, target_text

In [9]:
lyrics_dataset = Lyrics_Dataset(train_lyrics_list)
batch_size = 8

dataloader = DataLoader(lyrics_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

# Rewrite collate function to pad the sequences which length shorter than seq_len
def collate_fn(batch):
    inputs, targets = zip(*batch)
#     print(inputs)
    inputs = [torch.tensor([word_to_idx[word] for word in input_seq]) for input_seq in inputs]
    targets = [torch.tensor([word_to_idx[word] for word in target_seq]) for target_seq in targets]
    padded_inputs = pad_sequence(inputs, batch_first=True, padding_value=0)
    padded_targets = pad_sequence(targets, batch_first=True, padding_value=0)
    return padded_inputs, padded_targets

dataloader.collate_fn = collate_fn

# i = 0
for inputs, labels in dataloader:
#     i += 1
#     print(i)
    print(inputs)
    print(labels)
    break

tensor([[  23,   31,   63,    6,  868],
        [ 285,   27, 1740,  403,  387],
        [ 981,   18,   51,  168,    3],
        [  29,   54,    6,   49,   15],
        [  20,   18,   20,   18,   20],
        [ 356,  459,    7,   21,    4],
        [ 223,   27, 1557,   16,    4],
        [ 152,  110,    9,  346,   17]])
tensor([[  31,   63,    6,  868,   34],
        [  27, 1740,  403,  387,   27],
        [  18,   51,  168,    3,  457],
        [  54,    6,   49,   15,  293],
        [  18,   20,   18,   20,   18],
        [ 459,    7,   21,    4,  731],
        [  27, 1557,   16,    4,  730],
        [ 110,    9,  346,   17,  352]])


# Build the LSTM model

In [10]:
class Weekend_LSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_layers):
        super(Weekend_LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.lstm1 = nn.LSTM(embed_dim, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.attn = nn.Linear(hidden_size * 2, hidden_size * 2)
        self.lstm2 = nn.LSTM(hidden_size * 2, hidden_size * 2, num_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(0.2)
        self.fc = nn.Linear(hidden_size * 4, vocab_size)
        
    def forward(self, x):
        batch_size = x.size(0)
        hidden1 = self.init_hidden(self.num_layers * 2, batch_size, self.hidden_size)
        hidden2 = self.init_hidden(self.num_layers * 2, batch_size, self.hidden_size*2)
        
        out = self.embed(x)
        out, hidden1 = self.lstm1(out, hidden1)
        attn = self.attn(out)
        out = torch.mul(out, attn)
        out, hidden2 = self.lstm2(out, hidden2)
        out = self.dropout(out).reshape(-1, self.hidden_size * 4) 
        out = self.fc(out)
        
        return out, hidden2
    
    def init_hidden(self, num_layers, batch_size, hidden_size):
        return (torch.zeros(num_layers, batch_size, hidden_size).to(device),
                torch.zeros(num_layers, batch_size, hidden_size).to(device))

In [11]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
hidden_size = 128
num_layers = 2
embed_dim = 256
model = Weekend_LSTM(vocab_size, embed_dim, hidden_size, num_layers).to(device)

In [12]:
num_param = sum([param.nelement() for param in model.parameters()])
print("Number of parameter: %.2fM" % (num_param/1e6))

Number of parameter: 7.39M


# Train

In [13]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [14]:
import torch.optim as optim

epochs = 30
lr = 0.003
optimizer = optim.Adam(model.parameters(), lr=lr)
# lr_scheduler = optim.lr_scheduler.MultiStepLR(optimizer, [30, 45], gamma=0.3, last_epoch=-1)
criterion = nn.CrossEntropyLoss()
interval = 1200

model.train()
for epoch in range(epochs):
    print(f"{'='*20} Epoch: {epoch+1} {'='*20}\n")
    
    running_loss = 0.0
    for i, data in enumerate(dataloader):
        inputs, targets = data
        
        outputs, hidden = model(inputs.to(device))
        loss = criterion(outputs, targets.view(-1).to(device))
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        running_loss += loss.item()
        if i % interval == interval - 1:
            print(f'step: {i + 1} loss: {running_loss / i:.3f}, lr: {get_lr(optimizer)}')

#     lr_scheduler.step()

model_path = "model.pth"
torch.save(model, model_path)
print("model saved")


step: 1200 loss: 4.088, lr: 0.003
step: 2400 loss: 3.336, lr: 0.003
step: 3600 loss: 2.944, lr: 0.003

step: 1200 loss: 1.660, lr: 0.003
step: 2400 loss: 1.641, lr: 0.003
step: 3600 loss: 1.617, lr: 0.003

step: 1200 loss: 1.217, lr: 0.003
step: 2400 loss: 1.239, lr: 0.003
step: 3600 loss: 1.243, lr: 0.003

step: 1200 loss: 1.010, lr: 0.003
step: 2400 loss: 1.044, lr: 0.003
step: 3600 loss: 1.063, lr: 0.003

step: 1200 loss: 0.900, lr: 0.003
step: 2400 loss: 0.944, lr: 0.003
step: 3600 loss: 0.975, lr: 0.003

step: 1200 loss: 0.846, lr: 0.003
step: 2400 loss: 0.882, lr: 0.003
step: 3600 loss: 0.907, lr: 0.003

step: 1200 loss: 0.790, lr: 0.003
step: 2400 loss: 0.827, lr: 0.003
step: 3600 loss: 0.857, lr: 0.003

step: 1200 loss: 0.755, lr: 0.003
step: 2400 loss: 0.784, lr: 0.003
step: 3600 loss: 0.812, lr: 0.003

step: 1200 loss: 0.719, lr: 0.003
step: 2400 loss: 0.762, lr: 0.003
step: 3600 loss: 0.790, lr: 0.003

step: 1200 loss: 0.693, lr: 0.003
step: 2400 loss: 0.727, lr: 0.003
step

# Generate

In [30]:
import torch.nn.functional as F
import random

def predict(model, words, k):
    words_idx = [word_to_idx[word] for word in words]
    with torch.no_grad():
        inputs = torch.tensor([words_idx]).to(device)
        hidden_layers = model.init_hidden(num_layers, 1, hidden_size)
        # Detach
        hidden = tuple([layer.data for layer in hidden_layers])
        
        out, hidden = model(inputs)
        prob = F.softmax(out, dim=1).data
        top_value, top_idx = torch.topk(prob, k=k)
        choose = [sublist[random.randint(0, k-1)] for sublist in top_idx.tolist()]
        
        seq = []
        for idx in choose:
            seq.append(idx_to_word[idx])
    return seq

# You can choose how long you want the model to output
def gen(model, words, k, length):
    input_length = len(words)
    seq = words
    for i in range(length - input_length):
        out = predict(model, seq, k)
        seq.append(out[-1])
    return seq

['i', 'heard', 'you', 'know', 'the', 'dick', 'that', 'you']


In [34]:
inputs = "i"
input_words = inputs.split()
print(gen(model, input_words, 2, 10))

['i', 'might', 'get', 'violent', 'in', 'the', 'club', 'like', 'its', 'half']
