In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import re
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [None]:
data = pd.read_csv('../data/haiku.csv')
data = data.replace("/", " / ", regex=True)
data = data.dropna()
data.head()

In [None]:
def tokenize(sentence: str):
    tokens = sentence.lower().split()
    tokens = [process_token(t) for t in tokens]
    return tokens

def process_token(token: str):
    if token.strip() == "/":
        return token
    return re.sub(r'[^\w\s]', '', token.strip())

def vectorize(tokens):
    vocab, index = {}, 1
    vocab['<pad>'] = 0
    for token in tokens:
        token = token.strip()
        if token not in vocab:
            vocab[token] = index
            index += 1
    return vocab

def find_max_length(vectorized_poems):
    max_length = 0
    for poem in vectorized_poems:
        max_length = max(max_length, len(poem))   
    return max_length

In [None]:
all_text = " ".join(data["processed_title"].to_list())
tokens = tokenize(all_text)
vocab_map = vectorize(tokens)

In [None]:
vocab_map

In [None]:
data["vectorized"] = data["processed_title"].apply(lambda x: [vocab_map[t] for t in tokenize(x)])
data = data[[a.count(4) <= 2 for a in data['vectorized']]]
max_length = find_max_length(data["vectorized"])
data = data[data['vectorized'].apply(lambda x: len(x) <= 19)]
data.head()

In [None]:
data_vectorized_list = data["vectorized"].to_list()
padded_data = pad_sequences(data_vectorized_list, padding='post')
haikus = np.array(padded_data)
haikus

In [None]:
window_size = 4

In [None]:
training_examples = []

for haiku in haikus:
    for i in range(len(haiku) - window_size + 1):
        input_words = haiku[i:i+window_size-1]
        output_word = haiku[i+window_size-1]
        training_examples.append((input_words, output_word))
training_examples[:20]

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Embedding, Dense, SimpleRNN, GRU
from sklearn.model_selection import train_test_split
tf.keras.backend.clear_session()

In [None]:
vocab_size = len(vocab_map)
embedding_size = 128
input_length = window_size - 1
model = Sequential([
    Embedding(input_dim=vocab_size, output_dim=embedding_size, input_length=input_length),
    GRU(32),
    Dense(512, activation='relu'),
    Dense(128, activation='relu'),
    Dense(vocab_size, activation='softmax')
])


In [None]:
import keras.backend as K
def masked_loss(y_true, y_pred):
    mask = K.cast(K.not_equal(y_true, 0), K.floatx())
    loss = K.sparse_categorical_crossentropy(y_true, y_pred)
    masked_loss = loss * mask
    return K.sum(masked_loss) / K.sum(mask)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)
model.compile(loss=masked_loss, optimizer=optimizer, metrics=['accuracy'])

In [None]:
x = np.array([example[0] for example in training_examples[:15000]])
y = np.array([example[1] for example in training_examples[:15000]])
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size= 0.1)

In [None]:
tf.keras.backend.clear_session()
model.fit(x_train, y_train, batch_size=128, epochs=10, validation_data=(x_test, y_test))

In [None]:
vocab_map_inv = dict([(value, key) for key, value in vocab_map.items()])

In [None]:
def generate_poem(input_words: list):
    while len(input_words) < window_size - 1:
        input_words.insert(0, "<pad>")
    vectorized_input = [vocab_map[word] for word in input_words]
    print(f"User specified words {input_words} which were vectorized as {vectorized_input}")
    output_poem = input_words
    
    for i in range(19 - window_size - 1):
        input = np.array(vectorized_input[i:i+window_size-1]).reshape((1, window_size-1))
        prediction = np.array(model.predict(input, verbose=0))
        # new_word_vector = (prediction[0].argsort()[::-1])[np.random.randint(0,1)]
        new_word_vector = (prediction[0].argsort()[::-1])[0]
        vectorized_input.append(new_word_vector)
        new_word = vocab_map_inv[new_word_vector]
        output_poem.append(new_word)
    output = " ".join(output_poem)
    print(f"OUTPUT POEM: {output}")

In [None]:
generate_poem(["once", "i"])

In [None]:
generate_poem(["college", "students", "are"])

In [None]:
generate_poem(["fun", "today", "and", "tomorrow", "together"])

In [None]:
generate_poem(["the", "bear", "and", "i", "will"])

In [None]:
generate_poem(["you","are", "a", "movie"])

In [None]:
def pick_random_words(vocab_map, number):
    vocabs = list(vocab_map.keys())
    inds = list(np.random.randint(0, len(vocabs), number))
    output_words = []
    for i in inds:
        output_words.append(vocabs[i])
    return output_words

In [None]:
generate_poem(pick_random_words(vocab_map, window_size-1))

In [None]:
generate_poem(pick_random_words(vocab_map, 5))

In [None]:
generate_poem(["hi", "park", "ball", "stick", "water"])