# Imports

In [None]:
import tensorflow as tf
import pretty_midi
import glob
import time
import random
from sklearn.utils import class_weight
from sklearn.utils.class_weight import compute_class_weight


In [None]:
import os
import pickle
import spacy
import pandas as pd
import gensim.downloader as api
import numpy as np
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
import re
import os
import numpy as np
import pandas as pd
from tensorflow.keras import Input, Sequential, Model, initializers
from tensorflow.keras.layers import Flatten, Dense, Lambda, InputLayer, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam, Adagrad, Adadelta , RMSprop
from tensorflow.keras.layers import LSTM, Dropout, Activation
from tensorflow.keras.callbacks import EarlyStopping,LambdaCallback, TensorBoard
from tensorflow.keras.models import load_model
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 
import nltk
nltk.download('stopwords')

#from similarity.normalized_levenshtein import NormalizedLevenshtein

In [None]:
env = r'C:\Users\idan\Desktop\ass3'

#  Process songs from csv

In [None]:
def read_pickle(path):
    try:
        pick = pickle.load(open(path, "rb"))
        print("loaded pickle successfully")
        return pick
    except (OSError, IOError) as e:
        print(e)
        return 0
def dump_pickle(path, output):
    print("dumping to pickle, path {}".format(path))
    pick = open(path, "wb")
    pickle.dump(output, pick)
    pick.close()

In [None]:
def extract_language(path):
    #return a set of all words withing the entire songs
    vocabulary = []
    df = pd.read_csv(path,header=None)
    lyrics = df[:][2]
    lyrics = lyrics.ravel()
    for song_words in lyrics:
        songs_words = song_words.split(' ')
        #remove all chars which are not alphanumeric
        pattern = re.compile('[^\w&]+')
        res = [re.sub(pattern, '', word) for word in songs_words]
        vocabulary.extend(res)
    vocabulary = set(vocabulary)
    try:
        vocabulary.remove('')
        vocabulary.remove(' ')
    except Exception:
        pass
    return vocabulary



In [None]:
def get_songs_words(path):
    #returns a dict of song name and its words
    df = pd.read_csv(path,header = None)
    songs_dict = {}
    songs = df[:][1]
    pattern = re.compile('[^\w&]+')
    for song_name in songs:
        words = df[df[1] == song_name][2].values[0]
        songs_words = words.split(' ')
        #remove all chars which are not alphanumeric
        words = [re.sub(pattern, '', word) for word in songs_words]
        words = list(filter(('').__ne__, words))
        words = list(filter((' ').__ne__, words))
        songs_dict[song_name.strip()] = words
    return songs_dict

In [None]:
def download_embeddings(apiwords):
    E = api.load(apiwords)
    return E

def generate_onehot_for_language(L):
    onehot_dict = {}
    #each word into number
    le = LabelEncoder()
    transofrmed_words = le.fit_transform(L)
    transofrmed_words = transofrmed_words.reshape(transofrmed_words.shape[0],1)
    #transofrmed_words shape is (size of language,1)
    #each number into onehot vector 
    one_hot_encoder = OneHotEncoder(sparse=False)
    onehot_transformed = one_hot_encoder.fit_transform(transofrmed_words)
    #onehot_transformed shape is (size of lang , size of lang) --> each word has 1 on the correct index reprsenting the word.
    word_onehot_touples = zip(L, onehot_transformed)
    for word, onehot in word_onehot_touples:
        onehot_dict[word] = onehot
    return onehot_dict

def generate_one_hot_embedding(path):
    #returns a dict of index to one-hot-representation
    #i.e embedding[704]: [0....299]
    output = r"{}\one_hot_embedded.pickle".format(env)
    embedding = read_pickle(output)
    if embedding == 0:
        language = extract_language(path)
        api = r'word2vec-google-news-300'
        E = download_embeddings(api)
        L = np.array(list(language))
        one_hot = generate_onehot_for_language(L)
        embedding = {}
        for word, onehotarray in one_hot.items():
            index = np.argmax(onehotarray)
            try:
                embedding[index] = E[word]
            except Exception:
                #if there is no embedding for key in E
                embedding[index] = np.zeros(300)
        dump_pickle(output, embedding)
    return embedding


def generate_one_hot_words(path):
    #returns a dict of index to word
    #i.e embedding[704]: 'blues'
    output = r"{}\one_hot_to_word.pickle".format(env)
    embedding = read_pickle(output)
    if embedding == 0:
        language = extract_language(path)
        api = r'word2vec-google-news-300'
        E = download_embeddings(api)
        L = np.array(list(language))
        one_hot = generate_onehot_for_language(L)
        embedding = {}
        for word, onehotarray in one_hot.items():
            index = np.argmax(onehotarray)
            embedding[index] = word
        dump_pickle(output, embedding)
    return embedding



In [None]:
path = r'{}\lyrics_train_set.csv'.format(env)

env = r'C:\Users\idan\Desktop\ass3'
#glove-wiki-gigaword-300 & ord2vec-google-news-300
print('generate one hot embedding')
one_hot_embedding = generate_one_hot_embedding(path)
print('generate one hot words')
one_hot_words = generate_one_hot_words(path)

# Process MIDI 

In [None]:

def get_midi_vector(midi_info, song_length, strategy):
    """
    returns normalized vector represents the melody of each song
    """
    if strategy == 1:
        words_to_create = 500
        notes_per_word = 10
        pitch_norm = 150
        instruments_number = 10
        instruments_list = midi_info.instruments
        song = []
        for word in range(song_length):
            for instrument in instruments_list[:instruments_number]:
                notes = instrument.notes
                notes_counter = 0
                notes_interval = notes[word * notes_per_word:(word+1)*notes_per_word]
                for note in notes_interval:
                    song.append(note.pitch) 
                    notes_counter = notes_counter + 1
                if len(notes_interval) < notes_per_word:
                    for i in range(notes_per_word - len(notes_interval)):
                        song.append(0)
        return np.array(song) / pitch_norm
    elif strategy == 2:
        chroma_norm = 500
        et = midi_info.get_end_time()
        chromas = midi_info.get_chroma(fs=et/song_length, times=np.arange(0, et,et/song_length)).T
        chromas = chromas / chroma_norm
        return chromas


def generate_melody_vecors(path,output_name, strategy):
    """
    return a dict of song name and correspondding melody vector representation
    """
    embeddings = read_pickle(r'{}\{}.pickle'.format(env,output_name))
    if embeddings == 0:
        midis = glob.glob(r"{}\midi_files\*.mid".format(env))
        midis_songs_name = [f.split('_-_')[1].split('.')[0].lower().replace("_"," ") for f in midis]
        songs = get_songs_words(path)
        embeddings = {}
        for song_name, song_words in songs.items():
            if song_name in midis_songs_name:
                try:
                    index = midis_songs_name.index(song_name)
                    midi_path = midis[index]
                    midi_info = pretty_midi.PrettyMIDI(r'{}'.format(midi_path))
                    song_length = len(song_words)
                    embeddings[song_name] = get_midi_vector(midi_info, song_length, strategy)
                except:
                    print('error while loading midi file - {}'.format(midi_path))  
            else:
                print('could not find midi file for {}'.format(song_name))
        dump_pickle(r'{}\{}.pickle'.format(env,output_name),embeddings)
    return embeddings
            
    
    
        

In [None]:
env = 'C:\\Users\\idan\\Desktop\\ass3'
path = r'{}\lyrics_train_set.csv'.format(env)  
trian_melody_embeddings_s1= generate_melody_vecors(path, 'midi_train_set_S1',1)
path = r'{}\lyrics_test_set.csv'.format(env)  
test_melody_embeddings_s1 = generate_melody_vecors(path, 'midi_test_set_S1',1)
melody_embeddings_s1 = {}
melody_embeddings_s1.update(trian_melody_embeddings_s1)
melody_embeddings_s1.update(test_melody_embeddings_s1)
dump_pickle(r'{}\{}.pickle'.format(env,'all_melodies_S1'), melody_embeddings_s1)


path = r'{}\lyrics_train_set.csv'.format(env)  
trian_melody_embeddings_s2= generate_melody_vecors(path, 'midi_train_set_S2',2)
path = r'{}\lyrics_test_set.csv'.format(env)  
test_melody_embeddings_s2 = generate_melody_vecors(path, 'midi_test_set_S2',2)
melody_embeddings_s2 = {}
melody_embeddings_s2.update(trian_melody_embeddings_s2)
melody_embeddings_s2.update(test_melody_embeddings_s2)
dump_pickle(r'{}\{}.pickle'.format(env,'all_melodies_S2'), melody_embeddings_s2)


# Generate datasets

In [None]:


def add_melody_for_word(word_embedding, melodies,index,strategy):
    if strategy == 1:
        notes_per_word = 10
        instruments_number = 10
        try:
            word_melody = melodies[index * notes_per_word * instruments_number : (index + 1) * notes_per_word * instruments_number]
            if len(word_melody) < notes_per_word * instruments_number:
                word_melody = np.zeros(notes_per_word * instruments_number)
            if len(word_embedding) < 300:
                word_embedding = np.zeros(300)
            return np.concatenate([word_embedding, word_melody])
        except Exception as e:
            print(e)
            return np.concatenate([word_embedding, np.zeros(notes_per_word * instruments_number)])
    if strategy == 2:
        try:
            word_melody = melodies[index]
            if len(word_melody) == 0:
                word_melody = np.zeros(12)
            if len(word_embedding) == 0:
                word_embedding = np.zeros(300)
            return np.concatenate([word_embedding, word_melody])
        except Exception as e:
            print(e)
            return np.concatenate([word_embedding, np.zeros(12)])        


def generate_dataset(melodies, E, songsL, L, L_one_hot, strategy):
    """
    generating dataset while X contains an embedding of a word + part of melody
    Y contains the one-hot of the following word.
    """
    songs = songsL.keys()
    songs_X = []
    songs_Y = []
    for song in songs:
        print('processing {}'.format(song))
        try:
            song_melody = melodies[song]
            song_words = songsL[song]
            song_X = []
            song_y = []
            for index, word in enumerate(song_words[:-1]):
                try:
                    word_embedding = E[word]
                except Exception as e:
                    word_embedding = np.zeros(300)
                word_embedding_and_melody = add_melody_for_word(word_embedding, song_melody,index, strategy)
                following_word = song_words[index + 1]
                following_word_onehot = L_one_hot[following_word]
                song_X.append([word_embedding_and_melody])
                song_y.append(following_word_onehot)
            songs_X += song_X
            songs_Y += song_y
            
                
        except Exception as e:
            print(e)
    songs_X = np.vstack(songs_X)
    songs_X = songs_X.reshape((songs_X.shape[0],1,songs_X.shape[1]))
    return songs_X, songs_Y
            
                


# DL MODELS

In [None]:

def generate_sampling(preds):
    # we took this function from  
    # The book Deep-Learning-Natural-Language-Processing/dp/1838550291 , page 183.
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) 
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    chosen =  np.argmax(probas)
    return chosen




In [None]:
def choose_random_word(language, E):
    #choose random word with existing embedding
    try:
        word = random.choice(language)
        embedding = E[word]
        if embedding is not None:
            return word
        else:
            return choose_random_word(language, E)
    except Exception: 
        return choose_random_word(language, E)

In [None]:
def generate_lstm_net(one_hot_embedding,one_hot_words,L,L_size, trian_melody_embeddings,E,songsL,L_one_hot, strategy):
    if strategy == 1:
        print("working on strategy 1")
        try:
            model = load_model('{}\strategy{}_model_sparse.h5'.format(env,strategy))
            print('loaded model')
            return model, 1
        except Exception:
            print("creating model for strategy 1")
            model = Sequential()
            model.add(LSTM(128, input_shape=(1, 400)))
            #model.add(LSTM(64,return_sequences=True))
            #model.add(LSTM(32))
            model.add(Dropout(0.2))
            model.add(Dense(L_size))
            model.add(Activation('softmax'))
            optimizer = Adam(lr=0.0006)
            model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])
        return model, 0
    if strategy == 2:
        print("working on strategy 2")
        try:
            model = load_model('{}\strategy{}_model_sparse.h5'.format(env, strategy))
            print('loaded model')
            return model, 1
        except Exception:
            print("creating model for strategy 2")
            model = Sequential()
            model.add(LSTM(128, input_shape=(1, 300 + 12)))
            #model.add(LSTM(64,return_sequences=True))
            #model.add(LSTM(32))
            model.add(Dropout(0.2))
            model.add(Dense(L_size))
            model.add(Activation('softmax'))
            optimizer = Adam(lr=0.0006)
            model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])
        return model, 0


In [None]:
def create_songs(model,language,E,one_hot_words,melody_embeddings,strategy,random_words):
    print("\n\n\n\n==================DONE TRAINING - validation=====================\n")
    songs_words = get_songs_words(r'{}\lyrics_test_set.csv'.format(env))
    songs = list(songs_words.keys())
    first_random_word = random_words[0]
    second_random_word = random_words[1]
    third_random_word = random_words[2]

    for song_name in songs:
        song_first_word = songs_words[song_name][0]
        chosen_words = [song_first_word,first_random_word,second_random_word,third_random_word]
        for real_or_random_index, word in enumerate(chosen_words):
            song_created = word
            print('working on word {} at song {}'.format(word, song_name))
            song_first_word_embedding = E[word]
            if strategy == 1:
                song_first_word_match_melody = melody_embeddings[song_name][:100]
            if strategy == 2:
                song_first_word_match_melody = melody_embeddings[song_name][0]
            first_input = np.concatenate([song_first_word_embedding,song_first_word_match_melody])
            first_input = first_input.reshape((1,1,first_input.shape[0]))
    
            counter = 1
            prefix = first_input
            last_word = ''
            # generate song with the same length as the original song
            last_index = 0
            while counter < len(songs_words[song_name]):
                preds = model.predict(prefix[-1].reshape((1,) + prefix[-1].shape), verbose=0)[0]
                preds[last_index] = 0
                next_index = generate_sampling(preds)
                last_index = next_index
                next_word = one_hot_words[next_index]
                song_created += ' ' + next_word
                try:
                    next_word = E[next_word]
                except Exception:
                    next_word =  np.zeros(300)
                next_word =  add_melody_for_word(next_word, melody_embeddings[song_name],counter,strategy)
                next_word = next_word.reshape((1,1,next_word.shape[0]))
                counter = counter + 1
                prefix = np.vstack((prefix, next_word))
            docB = song_created
            docA = songs_words[song_name]
            similarity = get_similarity(docA,docB)
            similarity2 = get_similarity2(docA,docB)
            similarity3 = jaccard_similarity(docA, docB)
            if real_or_random_index == 0:
                selected = 'real'
            else:
                selected = 'random'
            file_name = '{}\{}_{}_{}-{}.txt'.format(env,selected,song_name,'S'+str(strategy),str(word))
            with open(file_name, 'w+',encoding='utf-8') as res_file:
                res = str(song_created).replace('&', '\n')
                res_file.write("similarity score : {} \n".format(similarity))
                res_file.write("similarity_2 score : {} \n".format(similarity2))
                res_file.write("similarity_jaccard score : {} \n".format(similarity3))
                res_file.write(res)
            
            
            
            

# Similarity 

In [None]:
def get_similarity(textA, textB):
    NLP = spacy.load('en_core_web_lg')
    textA = ' '.join(textA)
    textA = textA.replace('&', '\n')
    textB = textB.replace('&', '\n')
    docA = NLP(u'{}'.format(textA))
    docB = NLP(u'{}'.format(textB))
    print(docA)
    print('-------------------')
    print(docB)
    similarity = docA.similarity(docB)
    print('generated and original song similarity is: {}'.format(similarity))
    return similarity


def jaccard_similarity(textA, textB):
    textA = ' '.join(textA)
    textA = textA.replace('&', ' ')
    textB = textB.replace('&', ' ')
    list1 = textA.split(" ")
    list2 = textB.split(" ")
    s1 = set(list1)
    s2 = set(list2)
    return len(s1.intersection(s2)) / len(s1.union(s2))



def get_similarity2(t1,t2):
    t1 = ' '.join(t1)
    t1 = t1.replace('&', '')
    t2 = t2.replace('&', '')
    print(t1)
    print('-----')
    print(t2)

    # tokenization 
    t1_list = word_tokenize(t1)  
    t2_list = word_tokenize(t2) 
  
    # sw contains the list of stopwords 
    sw = stopwords.words('english')  
    l1 =[];l2 =[] 
  
    # remove stop words from string 
    t1_set = {w for w in t1_list if not w in sw}  
    t2_set = {w for w in t2_list if not w in sw} 

    # form a set containing keywords of both strings  
    rvector = t1_set.union(t2_set)  
    for w in rvector: 
        if w in t1_set: l1.append(1) # create a vector 
        else: l1.append(0) 
        if w in t2_set: l2.append(1) 
        else: l2.append(0) 
    c = 0

    # cosine formula  
    for i in range(len(rvector)): 
            c+= l1[i]*l2[i] 
    cosine = c / float((sum(l1)*sum(l2))**0.5) 
    print("similarity_2: ", cosine) 
    return cosine


# Execution

In [None]:
env = 'C:\\Users\\idan\\Desktop\\ass3'
path = r'{}\lyrics_train_set.csv'.format(env)  
trian_melody_embeddings_s1= generate_melody_vecors(path, 'midi_train_set_S1_100',1)
path = r'{}\lyrics_test_set.csv'.format(env)  
test_melody_embeddings_s1 = generate_melody_vecors(path, 'midi_test_set_S1_100',1)
melody_embeddings_s1 = {}
melody_embeddings_s1.update(trian_melody_embeddings_s1)
melody_embeddings_s1.update(test_melody_embeddings_s1)
dump_pickle(r'{}\{}.pickle'.format(env,'all_melodies_S1_100'), melody_embeddings_s1)


path = r'{}\lyrics_train_set.csv'.format(env)  
trian_melody_embeddings_s2= generate_melody_vecors(path, 'midi_train_set_S2_12',2)
path = r'{}\lyrics_test_set.csv'.format(env)  
test_melody_embeddings_s2 = generate_melody_vecors(path, 'midi_test_set_S2_12',2)
melody_embeddings_s2 = {}
melody_embeddings_s2.update(trian_melody_embeddings_s2)
melody_embeddings_s2.update(test_melody_embeddings_s2)
dump_pickle(r'{}\{}.pickle'.format(env,'all_melodies_S2_12'), melody_embeddings_s2)


In [None]:
path = r'{}\lyrics_train_set.csv'.format(env)
env = r'C:\Users\idan\Desktop\ass3'
#glove-wiki-gigaword-300 & ord2vec-google-news-300
print('generate one hot embedding')
one_hot_embedding = generate_one_hot_embedding(path)
print('generate one hot words')
one_hot_words = generate_one_hot_words(path)
a = r'word2vec-google-news-300'
E = download_embeddings(a)
songsL = get_songs_words(path)
language = extract_language(path)
L_size = len(language)
language = list(language)
L = np.array(language)
L_one_hot = generate_onehot_for_language(L)
one_hot_words = generate_one_hot_words(path)




In [None]:
random_words = []
for i in range(3):
    random_words.append(choose_random_word(language, E))       
for strategy in [1,2]:
    print('working on strategy {}'.format(strategy))
    model_status = 0
    if strategy == 1:
        model, model_status= generate_lstm_net(one_hot_embedding,one_hot_words,L,L_size, trian_melody_embeddings_s1,E,songsL,L_one_hot,strategy) 
        X_train,y_train =  generate_dataset(trian_melody_embeddings_s1, E, songsL, L, L_one_hot, strategy)
        melody_embeddings = melody_embeddings_s1
    if strategy == 2:
        model, model_status = generate_lstm_net(one_hot_embedding,one_hot_words,L,L_size, trian_melody_embeddings_s2,E,songsL,L_one_hot, strategy)
        X_train,y_train =  generate_dataset(trian_melody_embeddings_s2, E, songsL, L, L_one_hot, strategy)
        melody_embeddings = melody_embeddings_s2
    print('model status is: {}'.format(model_status))
    X_train,y_train = np.array(X_train), np.array(y_train)
    tensorboard = TensorBoard(log_dir=r'{}\tensorboard_strategy_random_words{}\{}'.format(env,strategy,time.time()))
    early_stopping = EarlyStopping(monitor='val_loss', patience=35)
    print("creating balanced weights")
    num_labels = np.argmax(y_train, axis=1)
    class_weights = compute_class_weight('balanced', np.unique(num_labels), np.array(num_labels))
    #d_class_weights = dict(enumerate(class_weights))
    if model_status == 0 :
        model.fit(X_train, y_train, epochs=50, verbose=1, validation_split=0.2,
              callbacks=[tensorboard,early_stopping], class_weight=class_weights) 
        print('SAVING MODEL - version {}'.format(strategy))
        model.save('{}\strategy{}_model_random_words.h5'.format(env,strategy)) 
    else:
        print('model alreay loaded')
        
    create_songs(model,language,E,one_hot_words,melody_embeddings,strategy,random_words)

