## Basic Ngram Model

The code in this notebook is inspired by the code from this source: https://towardsdatascience.com/text-generation-using-n-gram-model-8d12d9802aa0. I did change quite a few things from the source code, which I commented on throughout my code. Right now, I implemented the model as a trigram model, but you can implement it as any ngram model.

In [None]:
import numpy as np
import pandas as pd
import nltk
import string
import random
import torch
import torch.nn as nn
import random
import time
from typing import List

SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

if torch.backends.cudnn.enabled:
    torch.backends.cudnn.benchmark = False
    torch.cuda.manual_seed_all(SEED)

In [None]:
df_train = pd.read_csv("../input/lyrics-with-sentiment/lyrics_sentiments.csv") #in kaggle
#df_train = pd.read_csv("data/lyrics.csv") 

In [None]:
df_train['genre'].unique()

In [None]:
i=0
pop_lyrics = list()
while i < len(df_train.index):
    if df_train['genre'][i] == 'Pop' and type(df_train['lyrics'][i]) == str:
        pop_lyrics.append(df_train['lyrics'][i])
    i += 1

In [None]:
def remove_multiple_strings(cur_string, replace_list):
    for cur_word in replace_list:
        cur_string = cur_string.replace(cur_word, '')
    return cur_string

In [None]:
#lemma = nltk.stem.wordnet.WordNetLemmatizer()
rempunc = '(),.:[]'
from nltk.tokenize import wordpunct_tokenize
def clean(sentence):
    without_some_punc = remove_multiple_strings(sentence, rempunc)
    sentence = without_some_punc.lower()
    sentence  = sentence.replace('\n', ' nnnnnn ')
    sentence = wordpunct_tokenize(sentence)
    postagged = nltk.pos_tag(sentence)
    replace_newline = []
    for word in postagged:
        w,t = word
        if w == 'nnnnnn':
            t = 'NEWLINE'
        replace_word = w + '-' + t
        replace_newline.append(replace_word)
    return replace_newline

In [None]:
example = 'hello my good siR, \n mY Name Is mister Blue \n'
clean_ex = clean(example)
print(clean_ex)

Note that in the code below I removed the tokenize function from the original code, because I already lemmatized the sentence in the same way as we did for the neural network. I used "start" and "stop" to generated the n-grams, so a bit different from the neural network. Furthermore, I changed a few things in the word generation so it would start generating from two given words instead of from randomly initialized words. Finally, I did not split the sentences at periods, since this doesn't really make sense for songs. Instead, I split the input at every new song. 

In [None]:
def get_ngrams(n: int, tokens: list) -> list:
    """
    :param n: n-gram size
    :param tokens: tokenized sentence
    :return: list of ngrams
    ngrams of tuple form: ((previous wordS!), target word)
    """
    l = [(tuple([tokens[i-p-1] for p in reversed(range(n-1))]), tokens[i]) for i in range(n-1, len(tokens))]
    return l

def top_k_sampling(conditional_probability, target_words, k):
    n_conds = len(conditional_probability)
    #conditional_probability = np.array(conditional_probability)
    k = min(n_conds,k)
    top_k_probabilities, top_k_indices= tf.math.top_k(conditional_probability, k=k, sorted=True)
    top_k_indices = np.asarray(top_k_indices).astype("int32")
    top_k_redistributed_probability=tf.nn.softmax(np.log(top_k_probabilities))
    top_k_redistributed_probability = np.asarray(top_k_redistributed_probability).astype("float32")
    sampled_index = np.random.choice(top_k_indices, p=top_k_redistributed_probability)
    sampled_token = target_words[sampled_index]
    return sampled_token


class NgramModel(object):

    def __init__(self, n, k = 5):
        self.k = k
        self.n = n
        # dictionary that keeps list of candidate words given context
        self.context = {}
        #same as above but only tracks counts:
        self.context_count = {}
        # keeps track of how many times ngram has appeared in the text before
        self.ngram_counter = {}
        #probability dict
        self.conditional_probs = {}

    def update(self, sentence: str, asstr = True, types = True) -> None:
        """
        Updates Language Model
        :param sentence: input text
        """
        n = self.n
        ngrams = get_ngrams(n, clean(sentence))
        for ngram in ngrams:
            if asstr:
                if types:
                    context = ngram[0][0].split('-')[1] + '_' + ngram[0][1].split('-')[1]
                    target_word = ngram[1].split('-')[1]
                    ngram = '_'.join([context,target_word])
                else:
                    context = '_'.join(ngram[0])
                    target_word = ngram[1]
                    ngram = '_'.join([context,target_word])
            else:
                context, target_word = ngram
            if ngram in self.ngram_counter:
                self.ngram_counter[ngram] += 1.0
            else:
                self.ngram_counter[ngram] = 1.0

            
            if context in self.context:
                self.context[context].add(target_word)
                self.context_count[context] += 1.0
            else:
                self.context[context] = set([target_word])
                self.context_count[context] = 1.0

    def prob(self, context, token):
        """
        Calculates probability of a candidate token to be generated given a context
        :return: conditional probability
        """
        try:
            count_of_token = self.ngram_counter[(context, token)]
            count_of_context = float(len(self.context[context]))
            result = count_of_token / count_of_context

        except KeyError:
            result = 0.0
        return result

    def random_token(self, context):
        """
        Given a context we "semi-randomly" select the next word to append in a sequence
        :param context:
        :return:
        """
        r = random.random()
        map_to_probs = {}
        token_of_interest = self.context[context]
        for token in token_of_interest:
            map_to_probs[token] = self.prob(context, token)

        summ = 0
        for token in sorted(map_to_probs):
            summ += map_to_probs[token]
            if summ > r:
                return token

    def generate_text(self, context_queue, token_count: int):
        """
        :param token_count: number of words to be produced
        :return: generated text
        """
        n = self.n
        result = []
        for _ in range(token_count):
            obj = self.random_token(tuple(context_queue))
            if obj == 'NEWLINE':
                obj = '\n'
            result.append(obj)
            context_queue.pop(0)
            context_queue.append(obj)
        return ' '.join(result)
    
    def create_probabilities(self, asstr = True):
        self.conditional_probs = {}
        for context in self.context.keys():
            cont_dict = {}
            targets = list(self.context[context])
            if asstr:
                targ_count = np.array([self.ngram_counter['_'.join([context,target])] for target in targets])
            else:
                targ_count = np.array([self.ngram_counter[(context,target)] for target in targets])
            cond_prop = targ_count / self.context_count[context]
            cont_dict['target_words'] = targets
            cont_dict['probabilities'] = list(cond_prop)
            self.conditional_probs[context] = cont_dict
            
    def generate_text_withtopk(self, context_queue, token_count: int):
        """
        :param token_count: number of words to be produced
        :return: generated text
        """
        result = []
        for _ in range(token_count):
            conditional_prob = self.conditional_probs[tuple(context_queue)]
            probs = conditional_prob['probabilities']
            words = conditional_prob['target_words']
            obj = top_k_sampling(probs, words, self.k)
            if obj == 'NEWLINE':
                result.append('\n')
            else:
                result.append(obj)
            context_queue.pop(0)
            context_queue.append(obj)
        return ' '.join(result).replace(' \' ', '\'')



def create_ngram_model(n, intext, k = 5):
    m = NgramModel(n ,k)
    for sentence in intext:
        m.update(sentence)
    m.create_probabilities() 
    return m

if False:
    if __name__ == "__main__":
        start = time.time()
        m = create_ngram_model(3, pop_lyrics) 
        print (f'Language Model creating time: {time.time() - start}')
        start = time.time()
        random.seed(7)
        print(f'{"="*50}\nGenerated text:')
        print(m.generate_text_withtopk(["i", "am"], 200)) #change this if you want different input words/a different length
        print(f'{"="*50}')

In [None]:
gendf = df_train[df_train['genre'] == 'Pop']
gensentdf = gendf[gendf['sentiment'] == 'Negative']
lyrics = list(gensentdf.lyrics)
n_10 = create_ngram_model(5, lyrics)

In [None]:
probs = n_10.conditional_probs
probs[list(probs.keys())[5]]

In [None]:
lastconds = ngram_model.conditional_probs
ex1 = list(lastconds.keys())[7]
ex2 = lastconds['chorus_]']
print(ex2)

In [None]:
conditional_probs_per_input = {}
for genre in df_train['genre'].unique():
    start = time.time()
    gen_dict = {}
    gendf = df_train[df_train['genre'] == genre]
    for sent in df_train['sentiment'].unique():
        gensentdf = gendf[gendf['sentiment'] == sent]
        lyrics = list(gensentdf.lyrics)
        print(len(lyrics))
        ngram_model = create_ngram_model(3, lyrics)
        gen_dict[sent] = ngram_model.conditional_probs
        
    conditional_probs_per_input[genre] = gen_dict
    print ('Finished for {} in {}'.format(genre,time.time() - start))
        

In [None]:
import json

with open('../input/ngram-lyric-probabilities/ngrams_probabilities.json') as json_file:
    ngrams_probs = json.load(json_file)

In [None]:
def top_k_sampling(conditional_probability, target_words, k):
    """
    Top_k sampling made for Ngram word generation. Takes a probability distribution for a 
    ngram and returns one out of the top k most probable words.
    """
    n_conds = len(conditional_probability)
    #conditional_probability = np.array(conditional_probability)
    k = min(n_conds,k)
    top_k_probabilities, top_k_indices= tf.math.top_k(conditional_probability, k=k, sorted=True)
    top_k_indices = np.asarray(top_k_indices).astype("int32")
    top_k_redistributed_probability=tf.nn.softmax(np.log(top_k_probabilities))
    top_k_redistributed_probability = np.asarray(top_k_redistributed_probability).astype("float32")
    sampled_index = np.random.choice(top_k_indices, p=top_k_redistributed_probability)
    sampled_token = target_words[sampled_index]
    return sampled_token

In [None]:
def generate_text(conditional_dict, context_queue, token_count: int, k = 5):
        """
        :param token_count: number of words to be produced
        :return: generated text
        """
        result = [] + context_queue
        for _ in range(token_count):
            context_string = '_'.join(context_queue)
            conditional_prob = conditional_dict[context_string]
            probs = conditional_prob['probabilities']
            words = conditional_prob['target_words']
            obj = top_k_sampling(probs, words, k)
            if obj == 'NEWLINE':
                result.append('\n')
            else:
                result.append(obj)
            context_queue.pop(0)
            context_queue.append(obj)
        return ' '.join(result).replace(' \' ', '\'')

In [None]:
def create_song(all_conds,genre, sentiment, context_queue, token_count, k = 5):
    conddict = all_conds[genre][sentiment]
    return generate_text(conddict, context_queue, token_count, k)

In [None]:
print('Please pick a genre from : \n{}'.format(ngrams_probs.keys()))
print('Please pick a sentiment from: [Positive, Negative]')
print('Example song: \n')
print(create_song(ngrams_probs, 'Pop','Positive',['i', 'want'], 200, 15))