In [1]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sklearn import svm, linear_model
from sklearn.model_selection import train_test_split

import os
import string
import random
from subprocess import Popen, PIPE

import re

In [2]:
lyrics = pd.read_csv('lyrics_sorted.csv')

In [3]:
VOWELS = r'i|ɪ|e|ɛ|æ|a|ə|ɑ|ɒ|ɔ|ʌ|o|ʊ|u|y|ʏ|ø|œ|ɐ|ɜ|ɞ|ɘ|ɵ|ʉ|ɨ|ɤ|ɯ'

In [4]:
def get_phonemes(line):
    line = re.sub(r'"', '', line)
    
    command = 'espeak --ipa -q "{}"'.format(line)
    process = Popen(command, shell=True, stdout=PIPE)
    
    output, _ = process.communicate()
    output = str(output, encoding='utf-8')
    
    phon_words = [re.sub('[^{}]'.format(VOWELS), '', word) for word in output.split()]
    phon_concat = ''.join(re.findall(VOWELS, output))[::-1]
    
    return phon_words, phon_concat

In [5]:
def get_slice(index, lyrics, k):
    lower = index - k / 2
    upper = index + k / 2 - 1
    
    if lower < 0:
        upper += -1 * lower
        lower = 0
    elif upper > lyrics.shape[0] - 1:
        lower -= upper - lyrics.shape[0] - 1
        upper = lyrics.shape[0] - 1
          
    return lyrics.loc[lower:upper]

In [6]:
def select_candidates(input_phon, lyrics, k):
    max_prefix = (0, 0)
    
    for i, phon in enumerate(lyrics.Vowels):
        prefix_len = len(os.path.commonprefix([input_phon, phon]))
        if prefix_len > max_prefix[1]:
            max_prefix = (i, prefix_len)
    
    return get_slice(max_prefix[0], lyrics, k)

In [7]:
def calculate_features(input_line, line):
    translator = str.maketrans('', '', string.punctuation)

    line = line.translate(translator)
    input_line = input_line.translate(translator)
    
    # Semantic features
    #subjects, objects, verbs, total = get_semantic_features(input_line, line)
    jaccard, total = get_semantic_features(input_line, line)
    
    # Rhyming features
    end_rhyme, total_rhyme = get_rhyme_features(input_line, line)
    
    # Other features
    length = 1 - abs(len(input_line) - len(line)) / max(len(input_line), len(line))
    
    #return np.array([subjects, objects, verbs, total, end_rhyme, total_rhyme, length])
    return np.array([jaccard, total, end_rhyme, total_rhyme, length])

In [8]:
import spacy
from spacy.symbols import ORTH, LEMMA, POS
from spacy.symbols import nsubj, VERB, dobj, NOUN

nlp = spacy.load('en')

In [9]:
nlp.tokenizer.add_special_case(u'gimme',
    [
        {
            ORTH: u'gim',
            LEMMA: u'give',
            POS: u'VERB'},
        {
            ORTH: u'me'}])

nlp.tokenizer.add_special_case(u'wanna',
    [
        {
            ORTH: u'wan',
            LEMMA: u'want',
            POS: u'VERB'},
        {
            ORTH: u'na'}])

In [10]:
nlp.tokenizer.add_special_case(u'niggas',
    [
        {
            ORTH: u'niggas',
            LEMMA: u'nigga',
            POS: u'NOUN'}])

nlp.tokenizer.add_special_case(u'Niggas',
    [
        {
            ORTH: u'niggas',
            LEMMA: u'nigga',
            POS: u'NOUN'}])

nlp.tokenizer.add_special_case(u'Nigga',
    [
        {
            ORTH: u'nigga',
            LEMMA: u'nigga',
            POS: u'NOUN'}])

In [11]:
def average_vector(word_set):
    vectors = np.array([word.vector for word in word_set])
    return np.nanmean(vectors, axis=0)

def subjects_vector(doc):
    subjects = set([token for token in doc if token.dep == nsubj])    
    return average_vector(subjects)
            
def objects_vector(doc):
    objects = set([token for token in doc if token.dep == dobj])                     
    return average_vector(objects)

def verbs_vector(doc):
    verbs = set([token for token in doc if token.pos == VERB])
    return average_vector(verbs)

In [12]:
def get_semantic_features(input_line, line):
    input_doc = nlp(str(input_line))
    doc = nlp(str(line))
    
    input_subjects = subjects_vector(input_doc)
    subjects = subjects_vector(doc)
    sub_cos = compute_similarity(input_subjects, subjects)
    
    input_objects = objects_vector(input_doc)
    objects = objects_vector(doc)
    obj_cos = compute_similarity(input_objects, objects)
    
    input_verbs = verbs_vector(input_doc)
    verbs = verbs_vector(doc)
    verb_cos = compute_similarity(input_verbs, verbs)
    
    input_total = input_doc.vector
    total = doc.vector
    tot_cos = compute_similarity(input_total, total)
    
    return sub_cos, obj_cos, verb_cos, tot_cos

In [13]:
def get_semantic_features(input_line, line):
    input_doc = nlp(str(input_line))
    doc = nlp(str(line))
    
    input_total = input_doc.vector
    total = doc.vector
    tot_cos = compute_similarity(input_total, total)
    
    jaccard = compute_jaccard(input_doc, doc)
    
    return jaccard, tot_cos

In [14]:
def compute_jaccard(input_doc, doc):
    input_lemmas = {word.lemma_ for word in input_doc}
    ref_lemmas = {word.lemma_ for word in doc}
    
    intersect = len(input_lemmas.intersection(ref_lemmas))
    union = len(input_lemmas.union(ref_lemmas))
    
    return intersect / union

In [15]:
def compute_similarity(input_vector, ref_vector):
    if not type(input_vector) is np.ndarray and not type(ref_vector) is np.ndarray:
        return 1
    elif not type(input_vector) is np.ndarray or not type(ref_vector) is np.ndarray:
        return 0
    else:
        return cosine_similarity(input_vector.reshape(1, -1), ref_vector.reshape(1, -1))[0][0]

In [16]:
def get_rhyme_features(input_line, line):
    phon_words, phon_concat = get_phonemes(line)
    input_phon_words, input_phon_concat = get_phonemes(input_line)
    
    end_rhyme = len(os.path.commonprefix([input_phon_concat, phon_concat]))
       
    total_rhyme = 0
    for input_word in input_phon_words:
        max_len = 0
        for word in phon_words:
            current_len = len(os.path.commonprefix([input_word, word]))
            if current_len > max_len:
                max_len = current_len
        total_rhyme += max_len
    total_rhyme /= len(input_phon_words)
    
    return end_rhyme, total_rhyme

In [17]:
def get_best_line(input_line, lyrics, coef, k=100, random_candidates=False):
    _, input_phonemes = get_phonemes(input_line)
    
    if not random_candidates:
        candidates = select_candidates(input_phonemes, lyrics, k)
    else:
        candidates = lyrics.sample(n=k)
    
    best = (0, '')
    for _, candidate in candidates.iterrows():
        features = calculate_features(input_line, candidate.Line)
        score = np.dot(coef, features)
        if score > best[0] and features[0] < 0.5:
            best = (score, candidate.Line)
    
    return best[1]

In [41]:
def get_best_line(input_line, coef=[0.68, 0.12, -0.67, 0.08, 0.18], k=100, random_candidates=False):
    # Mean and standard deviation of the the trianing data
    mean = [0.05618665, 0.72029447, 1.11975, 0.71374178, 0.7401983]
    std = [0.10824952, 0.15384646, 1.13375921, 0.27429371, 0.1920263]
    
    _, input_phonemes = get_phonemes(input_line)
    
    if not random_candidates:
        candidates = select_candidates(input_phonemes, lyrics, k)
    else:
        candidates = lyrics.sample(n=k)
        
    semantic = coef[0] != 0 or coef[1] != 0
    rhyme = coef[2] != 0 or coef[3] != 0
    length = coef[4] != 0
    
    best = (-1, None)
    for _, candidate in candidates.iterrows():
        features = calculate_features(input_line, candidate.Line)
        features = (features - mean) / std
        score = np.dot(coef, features)
        if score > best[0] and candidate.Line != input_line:
            best = (score, candidate.Line)
    
    return best[1]

In [24]:
X = lyrics.Line.loc[:5000]
y = lyrics.NextLine.loc[:5000]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
truth_feat, other_feat = [], []

for line, next_line in zip(X_train, y_train):
    other_line = next_line
    while other_line == next_line or other_line == line:
        other_line = X.sample(n=1).iloc[0]
    
    truth = calculate_features(line, next_line)
    other = calculate_features(line, other_line)
    
    truth_feat.append(truth)
    other_feat.append(other)

In [33]:
mean = np.array(truth_feat + other_feat).mean(axis=0)
std = np.array(truth_feat + other_feat).std(axis=0)

truth_feat = (truth_feat - mean) / std
other_feat = (other_feat - mean) / std

In [34]:
X_pairs,  y_pairs = [], []

for truth, other in zip(truth_feat, other_feat):
    if not random.getrandbits(1):
        X_pairs.append(truth - other)
        y_pairs.append(1)
    else:
        X_pairs.append(other - truth)
        y_pairs.append(-1)
        
X_pairs, y_pairs = map(np.asanyarray, (X_pairs, y_pairs))

In [35]:
clf = svm.SVC(kernel='linear', C=.1)
clf.fit(X_pairs, y_pairs)
coef = clf.coef_.ravel() / np.linalg.norm(clf.coef_)

with open('coefficients.txt', 'w') as f:
    for value in coef:
        f.write(str(value) + '\n')

In [None]:
recall_1, recall_10, recall_50, recall_100 = 0, 0, 0, 0

i = 1
for line, next_line in zip(X_test[:50], y_test[:50]):
    print('{0}: Processing: {1}'.format(i, line))
    _, input_phonemes = get_phonemes(line)
    candidates = select_candidates(input_phonemes, lyrics, 110)
    
    scores = []
    for _, candidate in candidates.iterrows():
        score = np.dot(coef, calculate_features(line, candidate.Line))
        if candidate.Line.lower() != line:
            scores.append((score, candidate.Line))
        
    ranking = sorted(scores, key=lambda x: x[0])
    
    for i, score in ranking:
        if i == 0 and score[1] == next_line:
            recall_1 += 1
        elif i == 9 and score[1] == next_line:
            recall_10 += 1
        elif i == 49 and score[1] == next_line:
            recall_50 += 1
        elif i == 99 and score[1] == next_line:
            recall_100 += 1
            
    i +=1
    
print(recall_1 / i, recall_10 / i, recall_50 / i, recall_100 / i)

In [48]:
get_best_line("If I told you that a flower bloomed in a dark room, would you trust it?")

'I heard the bitch got hit with three zebras and a monkey'