In [1]:
import requests

url_dict = {
    'shakespeare.txt': 'https://caltech-cs155.s3.us-east-2.amazonaws.com/miniprojects/project3/data/shakespeare.txt',
    'spenser.txt': 'https://caltech-cs155.s3.us-east-2.amazonaws.com/miniprojects/project3/data/spenser.txt',
    'syllable_dict.txt' : 'https://caltech-cs155.s3.us-east-2.amazonaws.com/miniprojects/project3/data/Syllable_dictionary.txt',
    'about_syllable_dict.docx' : 'https://caltech-cs155.s3.us-east-2.amazonaws.com/miniprojects/project3/data/syllable_dict_explanation.docx'
}

def download_file(file_path):
    url = url_dict[file_path]
    print('Start downloading...')
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(file_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024 * 1024 * 1024):
                f.write(chunk)
    print('Complete')

download_file('shakespeare.txt')
download_file('spenser.txt')
download_file('syllable_dict.txt')
download_file('about_syllable_dict.docx')

Start downloading...
Complete
Start downloading...
Complete
Start downloading...
Complete
Start downloading...
Complete


In [2]:
import os
import re
import random
import urllib.request
import numpy as np
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from matplotlib import animation
from matplotlib.animation import FuncAnimation
import string

In [3]:
# parsing observations from the dataset (from PS6)
def parse_observations(text):
    # Convert text to dataset.
    lines = [line.split() for line in text.split('\n') if line.split()]

    obs_counter = 0
    obs = []
    obs_map = {}
    # sequences will be based on line
    seqs = []

    for line in lines:
        obs_elem = []

        for word in line:
            # dealing with apostrophes
            # only should removes commas and period
            keep_punctuation = string.punctuation.replace('-', '').replace("'", '')
            word = word.translate(str.maketrans('', '', keep_punctuation)).lower()
            if word[-1] == "'":
              word = word[:-1]
            # do not want to be adding numbers
            if word.isnumeric(): continue
            if word not in obs_map:
                # Add unique words to the observations map.
                obs_map[word] = obs_counter
                obs_counter += 1

            # Add the encoded word.
            obs_elem.append(obs_map[word])

        # Add the encoded sequence.
        if(len(obs_elem) > 0):
            obs.append(obs_elem)

    return obs, obs_map

def obs_map_reverser(obs_map):
    obs_map_r = {}

    for key in obs_map:
        obs_map_r[obs_map[key]] = key

    return obs_map_r

def get_shakespeare_dataset():
  # returns a dataset with obseratons and mapping of the observations to words
  with open('shakespeare.txt', 'r', encoding='utf-8') as file:
    text = file.read()
    return parse_observations(text)

# observations contains all the observations to be used in the training
obs, obs_map = get_shakespeare_dataset()
obs_map_reverse = obs_map_reverser(obs_map)

In [4]:
# ensuring that the parsing was correct
for i in range(20):
  seq = obs[i]
  decoded = [obs_map_reverse[l] for l in seq]
  print(decoded)

['from', 'fairest', 'creatures', 'we', 'desire', 'increase']
['that', 'thereby', "beauty's", 'rose', 'might', 'never', 'die']
['but', 'as', 'the', 'riper', 'should', 'by', 'time', 'decease']
['his', 'tender', 'heir', 'might', 'bear', 'his', 'memory']
['but', 'thou', 'contracted', 'to', 'thine', 'own', 'bright', 'eyes']
["feed'st", 'thy', "light's", 'flame', 'with', 'self-substantial', 'fuel']
['making', 'a', 'famine', 'where', 'abundance', 'lies']
['thy', 'self', 'thy', 'foe', 'to', 'thy', 'sweet', 'self', 'too', 'cruel']
['thou', 'that', 'art', 'now', 'the', "world's", 'fresh', 'ornament']
['and', 'only', 'herald', 'to', 'the', 'gaudy', 'spring']
['within', 'thine', 'own', 'bud', 'buriest', 'thy', 'content']
['and', 'tender', 'churl', "mak'st", 'waste', 'in', 'niggarding']
['pity', 'the', 'world', 'or', 'else', 'this', 'glutton', 'be']
['to', 'eat', 'the', "world's", 'due', 'by', 'the', 'grave', 'and', 'thee']
['when', 'forty', 'winters', 'shall', 'besiege', 'thy', 'brow']
['and', 'di

In [5]:
def get_syllable_dict(obs_map):
  # creates a syllable dict based on the observation map
  # also for the end syllable
  # to be used when sampling
  syllable_dict = {}
  end_syllable_dict = {}
  with open('syllable_dict.txt', 'r', encoding='utf-8') as file:
    text = file.read()
    lines = [line.split() for line in text.split('\n') if line.split()]
    for elems in lines:
      # for now will just assume the number of syllabes is the 2nd elem of syllable_dict.txt
      if elems[0] in obs_map:
        word_obs_num = obs_map[elems[0]]
        for i in range(1, len(elems)):
          num_syllables = re.findall(r'\d+', elems[i])
          if elems[i][0] == 'E':
            end_syllable_dict[word_obs_num] = int(num_syllables[0])
          else:
            syllable_dict[word_obs_num] = int(num_syllables[0])
  return syllable_dict, end_syllable_dict

syllable_dict, end_syllable_dict = get_syllable_dict(obs_map)

In [6]:
def generate_poem(emission, obs_map_reverse):
  # 12-14 lines follow very specific format
  # TODO: add in a check if the word is "I"

  line_count = 1
  for line in emission:
    sentence = [obs_map_reverse[i] for i in line]
    if line_count == 12:
      print(' '.join(sentence).capitalize() + '.')
    elif line_count == 13:
      print('  ' + ' '.join(sentence).capitalize() + ',')
    elif line_count == 14:
      print('  ' + ' '.join(sentence).capitalize() + '.')
    else:
      print(' '.join(sentence).capitalize() + ',')
    line_count += 1

### RNN Code

In [7]:
import pandas as pd
import requests
from numpy import genfromtxt
import torch
import torch.nn as nn
import torch.optim as optim

In [8]:
def one_hot(seq, dict_len, seq_len, size):
  features = np.zeros((size, seq_len, dict_len), dtype=np.float32)
  for i in range(size):
    for u in range(seq_len):
      features[i, u, seq[i][u]] = 1
  return features

In [9]:
def generate_rnn_data(text,step=1):
  text = re.sub(r'[^a-zA-Z ,\'.!?_\n]', '', text).lower()
  text = re.sub(' +', ' ', text)
  chars = [c for c in text]

  obs_counter = 0
  obs = []
  obs_map = {}

  for c in chars:
      if c not in obs_map:
          obs_map[c] = obs_counter
          obs_counter += 1
      obs.append(obs_map[c])

  x_raw = []
  y = []
  seq_len = 40

  for i in range(0,len(obs)-seq_len,step):
    x_raw.append(obs[i:i+seq_len])
    y.append(obs[i+1:i+1+seq_len])

  x = one_hot(x_raw, len(obs_map), seq_len, len(x_raw))
  return x, y, obs, obs_map

In [10]:
with open('shakespeare.txt', 'r') as file:
    shakespeare = file.read()

In [11]:
rnn_x, rnn_y, rnn_obs, rnn_obs_map = generate_rnn_data(shakespeare,3)
rnn_obs_map_r = obs_map_reverser(rnn_obs_map)

torch_x = torch.from_numpy(np.array(rnn_x))
torch_y = torch.from_numpy(np.array(rnn_y))

In [12]:
class Model(nn.Module):
  def __init__(self, input_size, output_size, hidden_dim, n_layers):
    super(Model, self).__init__()
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
    self.rnn = nn.LSTM(input_size, hidden_dim, n_layers, batch_first=True)
    self.fc = nn.Linear(hidden_dim, output_size)
    self.sm = nn.LogSoftmax(dim=-1)

  def forward(self, x):
    batch_size = x.size(0)
    hidden = self.init_hidden(batch_size)
    out, hidden = self.rnn(x, hidden)
    out = out.contiguous().view(-1, self.hidden_dim)
    out = self.fc(out)
    out = self.sm(out)
    return out, hidden

  def init_hidden(self, batch_size):
    hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim),
              torch.zeros(self.n_layers, batch_size, self.hidden_dim))
    return hidden

In [13]:
def predict(model, character, temp=1):
  character = np.array([[rnn_obs_map[c] for c in character]])
  character = one_hot(character, len(rnn_obs_map), character.shape[1], 1)
  character = torch.from_numpy(character)

  out, hidden = model(character)

  prob = nn.functional.softmax(out[-1], dim=0).data.numpy()
  prob = np.exp(np.log(prob)/temp)
  prob /= np.sum(prob)

  char_ind = random.choices(list(range(len(rnn_obs_map_r))),weights=prob)

  return rnn_obs_map_r[char_ind[0]], hidden

In [14]:
def sample(model, out_len, temp=1, start="shall i compare thee to a summer's day?\n"):
  model.eval()
  start = start.lower()
  chars = [ch for ch in start]
  size = out_len - len(chars)
  for ii in range(size):
      char, h = predict(model, chars, temp)
      chars.append(char)

  return ''.join(chars)

In [15]:
model = Model(input_size=len(rnn_obs_map), output_size=len(rnn_obs_map), hidden_dim=150, n_layers=1)

n_epochs = 100
lr=0.01

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
for epoch in range(1, n_epochs + 1):
  optimizer.zero_grad()
  output, hidden = model(torch_x)
  loss = criterion(output, torch_y.view(-1).long())
  loss.backward()
  optimizer.step()
  print('\n\n\n\nEpoch: {}/{}.............'.format(epoch, n_epochs), end=' ')
  print("Loss: {:.4f}".format(loss.item()))

  if epoch%10 == 0:
    print("\n\nTemperature = 1:")
    print(sample(model, 560))
    print("\n\nTemperature = 1.5:")
    print(sample(model, 560, temp=1.5))
    print("\n\nTemperature = 0.75:")
    print(sample(model, 560, temp=0.75))
    print("\n\nTemperature = 0.25:")
    print(sample(model, 560, temp=0.25))