In [None]:
# Markov with Syllable Neural Network
# Same rhyme ranker as MikesVersion1
# Changelog:
# Uses hyperparameter tuning using the Keras Tuner to find optimum number of hidden layer nodes, learning rate & epoch.

In [1]:
#@title Import Statements
!pip install PyGithub
!pip install -q -U keras-tuner

# Package Imports
import random
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt 
from urllib.request import urlopen # The default requests package
import requests # For making GitHub requests
from pprint import pprint # For pretty printing
from pathlib import Path # The Path class

# For the more advanced requests
import base64
import os
import sys
sys.path.append("./PyGithub");
from github import Github
from getpass import getpass

# For the Neural Network
import tensorflow as tf
from tensorflow import keras
import kerastuner as kt

# For importing the training, testing and validation data
from google.colab import drive
drive.mount('/content/drive')

Collecting PyGithub
[?25l  Downloading https://files.pythonhosted.org/packages/18/44/df78514f2b5f5abaec330596e0fa3273824238399a964d1a7e82fd39990d/PyGithub-1.54.1-py3-none-any.whl (289kB)
[K     |█▏                              | 10kB 10.8MB/s eta 0:00:01[K     |██▎                             | 20kB 12.8MB/s eta 0:00:01[K     |███▍                            | 30kB 8.9MB/s eta 0:00:01[K     |████▌                           | 40kB 7.9MB/s eta 0:00:01[K     |█████▊                          | 51kB 4.6MB/s eta 0:00:01[K     |██████▉                         | 61kB 5.0MB/s eta 0:00:01[K     |████████                        | 71kB 5.0MB/s eta 0:00:01[K     |█████████                       | 81kB 5.1MB/s eta 0:00:01[K     |██████████▏                     | 92kB 5.6MB/s eta 0:00:01[K     |███████████▍                    | 102kB 5.9MB/s eta 0:00:01[K     |████████████▌                   | 112kB 5.9MB/s eta 0:00:01[K     |█████████████▋                  | 122kB 5.9MB/s et

In [2]:
#@title Function Definitions
# Recursively Import the Data (AUTOMATIC)

def _decode_and_write(file__, path_):
    data = file__.decoded_content
    data = data.decode('utf-8')[1:]
    with open(path_, 'w') as writefile:
        writefile.write(data) 
    data = data.splitlines()
    data_rows = []
    for count, word in enumerate(data):
        if count>0:
            data_rows.append(word.split(','))
    data = pd.DataFrame(data_rows)
    data = data.to_numpy()
    return data


def import_github(path_name="AllLyrics.txt"):
    """
    Function for importing the github file
    path_name: str
    output: None
    """
    g = Github(getpass("Enter your PAT key ")) # Enter your PAT Key.
    username = "MikeMNelhams"
    main_branch_bool = input("Main Branch: Yes or No? ")
    yes_synonyms = ["yes", "y", "yh", "ye", "1", "true"]
    if main_branch_bool.lower() in yes_synonyms: 
        branch = "master" 
    else: 
        branch = "PROTOTYPE"

    user = g.get_user(username)
    r_proj_clone = 0
    for repo in g.get_user().get_repos():
        if repo.name == "ai-group-project-Team-JMJM":
            r_proj_clone = repo
            break
        # To see all the available attributes and methods
        print(dir(repo))
    if not r_proj_clone:
        print("ai-group-project-Team-JMJM not found")
        sys.exit()
    print("Importing Github cleaned text files...")
    contents = r_proj_clone.get_contents("RapLyrics/CLEAN", ref=branch)
    RAP_DATA = []
    for file_ in contents:
        path = file_.path
        path = str(path) 
        # Only choose the .txt files
        if path[-4:] == '.txt':
            # Append the Lyrics
            RAP_DATA.append(file_.decoded_content.decode("utf-8")) 
    
    temp_path = Path(path_name)
    if temp_path.is_file(): 
        if os.stat(path_name).st_size == 0:
            write_bool2 = True
        else: 
            write_bool2 = False
    else: 
        write_bool2 = True
    
    if write_bool2: 
        for lyric in RAP_DATA: 
            try:
                with open(path_name, 'w') as writefile: 
                    writefile.write(lyric)
            except: 
                print("Error, file moved/deleted during write")
        print("{} is now up to date!".format(path_name))
    else: 
        print("{} is already up to date!".format(path_name))
    
    contents = r_proj_clone.get_contents("RapLyrics/Other", ref=branch)
    for counter, file_ in enumerate(contents):
        path = file_.path
        path = str(path) 

        title_start = path.find('Other')
        title_len = path[title_start:].find('.')
        path = path[title_start + 6:title_start + title_len + 4]

        print("Writing file {} {}".format(counter, path))
        temp_path = Path(path)
        if temp_path.is_file():
            with open(path,'w'): pass # Cheeky way to clear the file if it exists
        
        # Split the long string into a list of lines, then split by words, then put into a csv, then to numpy array 
        data = file_.decoded_content
        data = data.decode('utf-8')[1:]

        with open(path, 'w') as writefile:
            writefile.write(data) 
    print("All files now up to date!")


def update_github(write_bool=False, path_name="AllLyrics.txt"):
    """
    Function for updating the github file, by cleaning the lyrics, optional write to txt file. 
    write_bool: bool
    path_name: str
    output: None
    """
    g = Github(getpass("Enter your PAT key ")) # Enter your PAT Key.
    username = "MikeMNelhams"
    main_branch_bool = input("Main Branch: Yes or No? ")
    yes_synonyms = ["yes", "y", "yh", "1", "true"]
    if main_branch_bool.lower() in yes_synonyms: 
        branch = "master" 
    else: 
        branch = "PROTOTYPE"

    user = g.get_user(username)
    r_proj_clone = 0
    for repo in g.get_user().get_repos():
        if repo.name == "ai-group-project-Team-JMJM":
            r_proj_clone = repo
            break
        # To see all the available attributes and methods
        print(dir(repo))
    
    if not r_proj_clone:
        print("ai-group-project-Team-JMJM not found")
        sys.exit()

    print("Importing editing csv files...")

    # Split the long string into a list of lines, then split by words, then put into a csv, then to numpy arr
    contents = r_proj_clone.get_contents("RapLyrics/Other", ref=branch)
    for counter, file_ in enumerate(contents):
        path = file_.path 
        path = str(path)
        title_start = path.find('Other')
        title_len = path[title_start:].find('.')
        name = path[title_start + 6:title_start + title_len + 4]
        print("Writing file {} {}".format(counter, name))
        if name.lower() == "censors.csv":
            censors = _decode_and_write(file_, path)
        elif name.lower() == "capitals.csv":
            capitals = _decode_and_write(file_, path)
        else: 
            _decode_and_write(file_, path)
    print("All editing csv files are up to date!")

    print("Importing Github uncleaned text files...")
    contents = r_proj_clone.get_contents("RapLyrics/UNCLEAN", ref=branch)

    RAP_DATA = []
    rap_lyric_names = []

    for file_ in contents:
        path = file_.path
        path = str(path) 
        # Only choose the .txt files
        if path[-4:] == '.txt':
            # Append the name
            title_start = path.find('UNCLEAN')
            title_len = path[title_start:].find('.')
            name = path[title_start + 8:title_start + title_len]
            if name[-2:] == 'UC':
                name = name[:-2]
            rap_lyric_names.append(name) 

        # Append the Lyrics
        RAP_DATA.append(file_.decoded_content.decode("utf-8")) 
        
    # Remove the \ufeff at the beginning O(n)
    for count, lyric in enumerate(RAP_DATA): 
        RAP_DATA[count] = lyric[1:]

    # Censor the profanities O(n*m + n*m2) m > m2 xor m2 > m
    for count in range(len(RAP_DATA)): 
        for i in range(len(censors[0:])):
            RAP_DATA[count] = RAP_DATA[count].replace(str(censors[i, 0]), str(censors[i, 1]))
        for i in range(len(capitals[0:])):
            RAP_DATA[count] = RAP_DATA[count].replace(str(capitals[i, 0]), str(capitals[i, 1]))

    contents = r_proj_clone.get_contents("RapLyrics/CLEAN", ref=branch)
    cleaned_names = []
    for counter, file_ in enumerate(contents):
        path = file_.path
        path = str(path) 
        print("File {} ".format(counter + 1) + path)
        # Only choose the .txt files
        if path[-4:] == '.txt':
            # Append the name
            title_start = path.find('CLEAN')
            title_len = path[title_start:].find('.')
        name = path[title_start + 6:title_start + title_len]
        if name[-2:] == 'CL':
            name = name[:-2]
        cleaned_names.append(name) 

    # ALL OF THE EDITING IS DONE IN THE 'PROTOTYPE BRANCH' to avoid overwriting import changes
    # If the (now cleaned) rap_lyrics name is new (not in cleaned_names), then we want to create that as a new file 
    # If the (now cleaned) rap_lyrics name is NOT new (not in cleaned_names), then we want to update the file
    # print(rap_lyric_names)
    # print(cleaned_names)
    print("Committing files to github...")
    for counter, new_name in enumerate(rap_lyric_names): 
        if new_name in cleaned_names: 
            duplicate = r_proj_clone.get_contents("RapLyrics/CLEAN/{}CL.txt".format(new_name), ref=branch)
            r_proj_clone.update_file("RapLyrics/CLEAN/{}CL.txt".format(new_name), "This was uploaded automatically via pipeline", RAP_DATA[counter], duplicate.sha, branch=branch)
        else:
            r_proj_clone.create_file("RapLyrics/CLEAN/{}CL.txt".format(new_name), "This was uploaded automatically via pipeline", RAP_DATA[counter], branch=branch)

    if write_bool: 
        print("Writing text file to: {}".format(path_name))
        with open(path_name, 'w') as writefile:
            for lyric in RAP_DATA:
                writefile.write(lyric)


In [3]:
# Import all of Mike's lyrics.
import_github()

Enter your PAT key ··········
Main Branch: Yes or No? y
Importing Github cleaned text files...
AllLyrics.txt is now up to date!
Writing file 0 capitals.csv
Writing file 1 censors.csv
Writing file 2 censors2.csv
All files now up to date!


In [4]:
train = pd.read_csv('/content/drive/MyDrive/training_data.csv')
test = pd.read_csv('/content/drive/MyDrive/testing_data.csv')
validation = pd.read_csv('/content/drive/MyDrive/validation_data.csv')

train_in = []
test_in = []
train_out = []
test_out = []
validation_in = []
validation_out = []

for row in train.itertuples():
    # train_in.append(row.Word)
    train_out.append(row.Number_of_Syllables)
    if row.Word == '                ':  # an empty word was getting in for some reason
        pass
    else:
        temp = list(row.Word)
        for i in range(len(temp)):
            temp[i] = ord(temp[i])
        train_in.append(temp)

for row in test.itertuples():
    # test_in.append(row.Word)
    test_out.append(row.Number_of_Syllables)
    if row.Word == '                ':
        pass
    else:
        temp = list(row.Word)
        for i in range(len(temp)):
            temp[i] = ord(temp[i])
        test_in.append(temp)

for row in validation.itertuples():
    # test_in.append(row.Word)
    validation_out.append(row.Number_of_Syllables)
    if row.Word == '                ':
        pass
    else:
        temp = list(row.Word)
        for i in range(len(temp)):
            temp[i] = ord(temp[i])
        validation_in.append(temp)

test_in = np.array(test_in)
test_out = np.array(test_out)
train_in = np.array(train_in)
train_out = np.array(train_out)
validation_in = np.array(validation_in)
validation_out = np.array(validation_out)

max_word = 143

callback = keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)

def model_builder(hp):
  model = keras.Sequential()
  model.add(keras.layers.Embedding(max_word, 100))
  model.add(keras.layers.GlobalAveragePooling1D())
  hp_units = hp.Int('units', min_value=32, max_value=512, step=32)
  model.add(keras.layers.Dense(100, activation=tf.nn.relu))
  model.add(keras.layers.Dense(8, activation=tf.nn.softmax))
  model.summary()

  hp_learning_rate = hp.Choice('learning_rate', values=[0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75])

  model.compile(optimizer=tf.compat.v1.train.GradientDescentOptimizer(learning_rate=hp_learning_rate, use_locking=False),
              loss='sparse_categorical_crossentropy', metrics=['accuracy'])
  return model

tuner = kt.Hyperband(model_builder, objective='val_accuracy', max_epochs=1000)

tuner.search(train_in, train_out, epochs=1000, batch_size=100, validation_data=(test_in, test_out), verbose=2, callbacks=[callback])
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

print(f"""
The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is {best_hps.get('units')} and the optimal learning rate for the optimizer
is {best_hps.get('learning_rate')}.
""")

Trial 224 Complete [00h 00m 02s]
val_accuracy: 0.6178795695304871

Best val_accuracy So Far: 0.6868169903755188
Total elapsed time: 00h 08m 55s
INFO:tensorflow:Oracle triggered exit

The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is 256 and the optimal learning rate for the optimizer
is 0.4.



In [5]:
# Build model using optimal hyperparameters and test for optimal number of epochs

model = tuner.hypermodel.build(best_hps)
history = model.fit(train_in, train_out, epochs=1000, batch_size=100, validation_data=(test_in, test_out))

val_acc_per_epoch = history.history['val_accuracy']
best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
print('Best epoch: ', best_epoch)


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, None, 100)         14300     
_________________________________________________________________
global_average_pooling1d (Gl (None, 100)               0         
_________________________________________________________________
dense (Dense)                (None, 100)               10100     
_________________________________________________________________
dense_1 (Dense)              (None, 8)                 808       
Total params: 25,208
Trainable params: 25,208
Non-trainable params: 0
_________________________________________________________________
Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 1

In [6]:
# Create and test hypermodel using optimal parameters

hypermodel = tuner.hypermodel.build(best_hps)
hypermodel.fit(train_in, train_out, epochs=best_epoch, batch_size=100, validation_data=(test_in, test_out))
result = hypermodel.evaluate(validation_in, validation_out)
print('Test loss:', result[0])
print('Test accuracy:', result[1])

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (None, None, 100)         14300     
_________________________________________________________________
global_average_pooling1d (Gl (None, 100)               0         
_________________________________________________________________
dense (Dense)                (None, 100)               10100     
_________________________________________________________________
dense_1 (Dense)              (None, 8)                 808       
Total params: 25,208
Trainable params: 25,208
Non-trainable params: 0
_________________________________________________________________
Epoch 1/423
Epoch 2/423
Epoch 3/423
Epoch 4/423
Epoch 5/423
Epoch 6/423
Epoch 7/423
Epoch 8/423
Epoch 9/423
Epoch 10/423
Epoch 11/423
Epoch 12/423
Epoch 13/423
Epoch 14/423
Epoch 15/423
Epoch 16/423
Epoch 17/423
Epoch 18/423
Epoch 19/423
Epoch 20/423

In [None]:
def rap():
    num_lines = int(input('How many lines would you like the rap to be? '))
    num_generated_lines = int(input('How many lines should be generated to choose from? '))
    count = int(input("How many syllables per line? "))

    # Extract all of Mike's lyrics.
    text = open("/content/drive/MyDrive/AllLyrics_uncleanOLD.txt", "r").read()
    vocabulary = ''.join([i for i in text if not i.isdigit()]).replace("\n", " ").split(' ')

    # Generate text
    def line_generator(vocab):
        index = 1
        chain = {}
        # count = 16 # https://colemizestudios.com/rap-lyrics-syllables/, apparently rappers usually use semiquavers
        line_count = 0
        number_of_tries = 0

        for word in vocab[index:]:
            key = vocab[index - 1]
            if key in chain:
                chain[key].append(word)
            else:
                chain[key] = [word]
            index += 1

        word1 = random.choice(list(chain.keys()))
        line = word1.capitalize()
        word1_with_spaces = word1
        while len(word1_with_spaces) < 16:
            word1_with_spaces += ' '
        temp_word = list(word1_with_spaces)
        for i in range(len(temp_word)):
            temp_word[i] = ord(temp_word[i])
        word_syllables = np.argmax(model.predict([temp_word]), axis=-1)
        word_count = word_syllables
        line_count += word_count

        while line_count < count:
            number_of_tries += 1
            word2 = random.choice(chain[word1])
            word2_with_spaces = word2
            while len(word2_with_spaces) < 16:
                word2_with_spaces += ' '
            temp_word = list(word2_with_spaces)
            for i in range(len(temp_word)):
                temp_word[i] = ord(temp_word[i])
            word_syllables = np.argmax(model.predict([temp_word]), axis=-1)
            word_count = word_syllables
            line_count += word_count
            # print(n)
            if line_count > count:  # don't include word if it makes line go over syllable count
                line_count -= word_count
            else:
                word1 = word2
                line += ' ' + word2.lower()
            if number_of_tries > 99:  # if not finding a word with right number of syllables, stop trying
                line += ' ERROR FINDING CORRECT SYLLABLE WORD'
                line_count = count
        return line

    # Rhyme Functions
    def reverse_syllable_extract(text):
        sy_form = []
        characters = [char for char in text]
        sylls = ['a', 'e', 'i', 'o', 'u', 'y']
        for x in characters:
            if x in sylls:
                sy_form.append(x)
        sy_form.reverse()
        return sy_form

    def rev_syllable_stop_count(text1, text2):
        counter = True
        i = 0
        counter = 0
        syll1 = reverse_syllable_extract(text1)
        syll2 = reverse_syllable_extract(text2)
        while counter:
            if i < min(len(syll1), len(syll2)) and syll1[i] == syll2[i]:
                counter += 1
                i += 1
            else:
                counter = False
        return counter

    def next_line_stop_count(start_line, lines):
        sy_lines = []
        for i in lines:
            sy_lines.append(rev_syllable_stop_count(start_line, i))
        choice = sy_lines[0]
        count = 0
        for i in range(len(sy_lines)):
            if sy_lines[i] > choice:
                choice = sy_lines[i]
        return lines[sy_lines.index(choice)]

    start_line = line_generator(vocabulary)
    done = False
    while not done:
        if 'ERROR FINDING CORRECT SYLLABLE WORD' in start_line:
            start_line = line_generator(vocabulary)
        else:
            done = True

    all_other_lines = []
    for i in range(num_generated_lines - 1):
      all_other_lines.append(line_generator(vocabulary))
    rap = [start_line]

    for n, line in enumerate(all_other_lines):
        done = False
        while not done:
            if 'ERROR FINDING CORRECT SYLLABLE WORD' in line:
                line = line_generator(vocabulary)
                all_other_lines[n] = line
            else:
                done = True

    for i in range(num_lines):
        if i % 2 == 1:
            next_line = next_line_stop_count(rap[len(rap) - 1], all_other_lines)
        else:
            next_line = random.choice(all_other_lines)
        all_other_lines.remove(next_line)
        rap.append(next_line)
    censors = pd.read_csv('censors2.csv')
    for i, line in enumerate(rap):
      for j, word in enumerate(line):
        for row in censors.itertuples():
          if word == row.word:
            line[j] = row.replacement
      rap[i] = line
    
    return rap

In [None]:
rap()

How many lines would you like the rap to be? 10
How many lines should be generated to choose from? 500
How many syllables per line? 16


['Heineken  dress up the blunts was the high now im paid uh cmon now',
 'Bon appetit  asked my wall and recognize the head right one',
 'Welcome to choose the hun chorusrepeat  went to die ',
 'Highness  had to speak to packin gats and taking her nose even',
 'In the ladies tonight that i get they money baby damn a',
 'Shootin  then im  im fuckin an  people at the third',
 'Changed but attack  and dangerous we buckin at my rings',
 'Burgular alarm systems  flippin on the sl the rim with',
 'From gym class to the goldie sound  hey ill die  to breathe',
 'Brat  i get suspicious  run up like his name to old',
 'Dot on  aint an itchy for my man n tip  see me ']

In [None]:
# Takes longer to load due to iterating through words until finding word with the right number of syllables
# Also takes longer due to replacing lines which contain errors
# Takes much much longer due to use of the neural network to predict syllables