<a href="https://colab.research.google.com/github/annamaartensson/dd2424project/blob/issue%2F14/models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import tensorflow as tf
import numpy as np
import pathlib
import os
import platform
import re

print(platform.python_version())
print(tf.__version__)

3.10.12
2.15.0


In [None]:
!pip install -qq -U wandb

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.7/6.7 MB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.3/207.3 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m277.3/277.3 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.7/62.7 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import wandb
wandb.login()

<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

 ··········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


True

In [2]:
def fetch_data():
  cache_dir = './tmp'
  dataset_file_name = 'pg31100.txt'
  dataset_file_origin = 'https://www.gutenberg.org/cache/epub/31100/pg31100.txt'
  dataset_file_path = tf.keras.utils.get_file(fname = dataset_file_name, origin = dataset_file_origin, cache_dir=pathlib.Path(cache_dir).absolute())
  text = open(dataset_file_path, mode='r').read()
  persuasion = text[1437:468297]
  northanger_abbey = text[468297:901707]
  mansfield_park = text[901707:1784972]
  emma = text[1784972:2668012]
  lady_susan = text[2668012:2795312]
  love_and_friendship = text[2795312:2980261]
  pride_and_predjudice = text[2980261:3665048]
  sense_and_sensibility = text[3682008:4355100]
  full_text = text[1437:4355100]
  books = [persuasion, northanger_abbey, mansfield_park, emma, lady_susan, love_and_friendship, pride_and_predjudice, sense_and_sensibility]
  return books

In [3]:
def get_vocabulary(text):
  vocabulary = sorted(set(text))
  char_to_ind = tf.keras.layers.StringLookup(vocabulary = list(vocabulary), mask_token = None)
  ind_to_char = tf.keras.layers.StringLookup(vocabulary = char_to_ind.get_vocabulary(), invert = True, mask_token = None)
  return vocabulary, char_to_ind, ind_to_char

In [None]:
def batch_data(text, seq_length, char_to_ind, batch_size = 1, buffer_size = 0):
  dataset = tf.data.Dataset.from_tensor_slices(char_to_ind(tf.strings.unicode_split(text, 'UTF-8')))
  sequences = dataset.batch(seq_length+1, drop_remainder = True).map(lambda s : (s[:seq_length], s[1:]))
  if buffer_size > 0:
    sequences = sequences.shuffle(buffer_size)
  batches = sequences.batch(batch_size, drop_remainder = True).prefetch(tf.data.experimental.AUTOTUNE)
  return batches

In [None]:
def clean_text(text):
  lower = text.lower()
  no_spec = re.sub("\&|\[|\]|\_|!|\?|\*|\.|,|\(|\)|;|:|[0-9]+|\"|\'","", lower)
  no_enter = re.sub("\n|-"," ", no_spec)
  return no_enter.split()

In [None]:
def get_dictionary(text):
  dictionary = {w for w in clean_text(text)}
  return dictionary

In [None]:
def correctly_spelled(text, dictionary):
  count = 0
  words = clean_text(text)
  for w in clean_text(text):
    if w in dictionary:
      count += 1
  return count/len(words)

In [None]:
class RNN(tf.keras.Model):
  def __init__(self, K, m):
    super().__init__(self)
    self.onehot = tf.keras.layers.Embedding(K, K, embeddings_initializer = 'identity', trainable = False)
    self.rnn = tf.keras.layers.SimpleRNN(m, return_sequences = True, return_state = True)
    self.dense = tf.keras.layers.Dense(K)

  def call(self, inputs, states = None, return_state = False, training = False):
    x = inputs
    x = self.onehot(x, training = training)
    if states is None:
      states = self.rnn.get_initial_state(x)
    x, states = self.rnn(x, initial_state = states, training = training)
    x = self.dense(x, training = training)
    if return_state:
      return x, states
    else:
      return x

class LSTM(tf.keras.Model):
  def __init__(self, K, m):
    super().__init__(self)
    self.onehot = tf.keras.layers.Embedding(K, K, embeddings_initializer = 'identity', trainable = False)
    self.lstm = tf.keras.layers.LSTM(m, return_sequences = True, return_state = True)
    self.dense = tf.keras.layers.Dense(K)

  def call(self, inputs, states = None, return_state = False, training = False):
    x = inputs
    x = self.onehot(x, training = training)
    if states is None:
      states = self.lstm.get_initial_state(x)
    x, *states = self.lstm(x, initial_state = states, training = training)
    x = self.dense(x, training = training)
    if return_state:
      return x, states
    else:
      return x

class LSTM2(tf.keras.Model):
  def __init__(self, K, m):
    super().__init__(self)
    self.onehot = tf.keras.layers.Embedding(K, K, embeddings_initializer = 'identity', trainable = False)
    self.lstm1 = tf.keras.layers.LSTM(m, return_sequences = True, return_state = True)
    self.lstm2 = tf.keras.layers.LSTM(m, return_sequences = True, return_state = True)
    self.dense = tf.keras.layers.Dense(K)

  def call(self, inputs, states = None, return_state = False, training = False):
    x = inputs
    x = self.onehot(x, training = training)
    if states is None:
      states_1 = self.lstm1.get_initial_state(x)
      states_2 = states_1
    else:
      states_1 = states[0]
      states_2 = states[1]
    x, *states_1 = self.lstm1(x, initial_state = states_1, training = training)
    x, *states_2 = self.lstm2(x, initial_state = states_2, training = training)
    x = self.dense(x, training = training)
    if return_state:
      return x, [states_1, states_2]
    else:
      return x

In [None]:
def generate_text_temperature(start, length, model, char_to_ind, ind_to_char, T = 1.0):
  text = []
  states = None
  for i in range(length):
    chars = tf.strings.unicode_split(start, 'UTF-8')
    logits, states = model(inputs = char_to_ind(chars).to_tensor(), states = states, return_state = True)
    logits = logits[:, -1, :]/T
    unk_inds = char_to_ind(['[UNK]'])[:, None]
    sparse_unk_mask = tf.SparseTensor(values = [-float('inf')]*len(unk_inds), indices = unk_inds, dense_shape=[len(char_to_ind.get_vocabulary())])
    logits = logits + tf.sparse.to_dense(sparse_unk_mask)
    pred = tf.random.categorical(logits, num_samples = 1)
    start = ind_to_char(tf.squeeze(pred, axis = -1))
    text.append(start[0].numpy().decode('utf-8'))
  return tf.strings.reduce_join(text, axis = -1).numpy().decode('utf-8')

def generate_text_nucleus(start, length, model, char_to_ind, ind_to_char, theta = 1.0):
  text = []
  states = None
  for i in range(length):
    chars = tf.strings.unicode_split(start, 'UTF-8')
    logits, states = model(inputs = char_to_ind(chars).to_tensor(), states = states, return_state = True)
    logits = logits[:, -1, :]
    unk_inds = char_to_ind(['[UNK]'])[:, None]
    sparse_unk_mask = tf.SparseTensor(values = [-float('inf')]*len(unk_inds), indices = unk_inds, dense_shape=[len(char_to_ind.get_vocabulary())])
    logits = logits + tf.sparse.to_dense(sparse_unk_mask)
    logits = tf.squeeze(logits, axis = 0)
    probs = tf.nn.softmax(logits)
    sorted_probs = tf.sort(probs, direction = 'DESCENDING')
    sorted_probs_sum = tf.math.cumsum(sorted_probs)
    thresh_inds = tf.where(sorted_probs_sum <= theta)
    if len(thresh_inds) > 0:
      thresh_ind = thresh_inds[-1, 0].numpy()
    else:
      thresh_ind = 0
    top_probs = tf.multiply(probs, tf.cast(probs >= sorted_probs[thresh_ind], 'float32'))/sorted_probs_sum[thresh_ind]
    pred = tf.random.categorical([tf.math.log(top_probs)], num_samples = 1)
    start = ind_to_char(tf.squeeze(pred, axis = -1))
    text.append(start[0].numpy().decode('utf-8'))
  return tf.strings.reduce_join(text, axis = -1).numpy().decode('utf-8')

In [6]:
books = fetch_data()

training_text = books[0] #+ books[1] + books[2] + books[3] + books[4] + books[5]
validation_text = books[6]
test_text = books[7]

vocabulary, char_to_ind, ind_to_char = get_vocabulary(training_text)
dictionary = get_dictionary(training_text)

Downloading data from https://www.gutenberg.org/cache/epub/31100/pg31100.txt


NameError: name 'get_dictionary' is not defined

In [61]:
class BytePairEncoder:

  def __init__(self):
    self.vocabulary = ()
    self.merges = {}
    self.ind_to_token = []
    self.token_to_ind = {}

  def __merge_pairs(self, tokens, pair, val):
    merged_tokens = []
    i = 0
    while i < len(tokens):
      if tokens[i] == pair[0] and i < len(tokens)-1 and tokens[i+1] == pair[1]:
        merged_tokens.append(val)
        i += 2
      else:
        merged_tokens.append(tokens[i])
        i += 1
    return merged_tokens

  def __get_pair_counts(self, tokens):
    counts = {}
    for i in range(len(tokens)-1):
      pair = tokens[i], tokens[i+1]
      if pair not in counts:
        counts[pair] = 1
      else:
        counts[pair] += 1
    return counts

  def train(self, text, target_size):
    self.ind_to_token = list(sorted(set(text)))
    self.token_to_ind = {self.ind_to_token[i] : i for i in range(len(self.ind_to_token))}
    tokens = [self.token_to_ind[c] for c in text]
    while len(self.ind_to_token) < target_size:
      counts = self.__get_pair_counts(tokens)
      best_pair = max(counts, key = counts.get)
      new_token = self.ind_to_token[best_pair[0]]+self.ind_to_token[best_pair[1]]
      new_val = len(self.ind_to_token)
      self.ind_to_token.append(new_token)
      self.token_to_ind[new_token] = new_val
      self.merges[best_pair] = new_val
      tokens = self.__merge_pairs(tokens, best_pair, new_val)

  def text_to_inds(self, text):
    tokens = [self.token_to_ind[c] for c in text]
    found_merge = True
    while found_merge:
      merged_tokens = []
      found_merge = False
      i = 0
      while i < len(tokens):
        if i < len(tokens)-1 and (tokens[i], tokens[i+1]) in self.merges:
          merged_tokens.append(self.merges[(tokens[i], tokens[i+1])])
          found_merge = True
          i += 2
        else:
          merged_tokens.append(tokens[i])
          i += 1
      tokens = merged_tokens
    return tokens

  def inds_to_text(self, inds):
    return "".join([self.ind_to_token[i] for i in inds])

bpe = BytePairEncoder()
bpe.train(training_text, 150)
print(bpe.token_to_ind)
encode = bpe.text_to_inds('and quite the gentleman in all his notions and behaviour')
print(encode)
print(bpe.inds_to_text(encode))

{'\n': 0, ' ': 1, '!': 2, '"': 3, '&': 4, "'": 5, '(': 6, ')': 7, ',': 8, '-': 9, '.': 10, '0': 11, '1': 12, '2': 13, '3': 14, '4': 15, '5': 16, '6': 17, '7': 18, '8': 19, '9': 20, ':': 21, ';': 22, '?': 23, 'A': 24, 'B': 25, 'C': 26, 'D': 27, 'E': 28, 'F': 29, 'G': 30, 'H': 31, 'I': 32, 'J': 33, 'K': 34, 'L': 35, 'M': 36, 'N': 37, 'O': 38, 'P': 39, 'Q': 40, 'R': 41, 'S': 42, 'T': 43, 'U': 44, 'V': 45, 'W': 46, 'Y': 47, 'Z': 48, 'a': 49, 'b': 50, 'c': 51, 'd': 52, 'e': 53, 'f': 54, 'g': 55, 'h': 56, 'i': 57, 'j': 58, 'k': 59, 'l': 60, 'm': 61, 'n': 62, 'o': 63, 'p': 64, 'q': 65, 'r': 66, 's': 67, 't': 68, 'u': 69, 'v': 70, 'w': 71, 'x': 72, 'y': 73, 'z': 74, 'e ': 75, 'th': 76, 'd ': 77, 'er': 78, 't ': 79, 'in': 80, 's ': 81, ', ': 82, 'an': 83, 'en': 84, 'y ': 85, 'ou': 86, 'o ': 87, 'on': 88, 'ha': 89, 'ing': 90, 'or': 91, 'of': 92, ' th': 93, 'to ': 94, 're': 95, 'and ': 96, 'll': 97, '  ': 98, 'er ': 99, 'ar': 100, 'ed ': 101, '.  ': 102, 'hi': 103, 'as ': 104, 'he ': 105, 'ing ':

In [None]:
configs = dict(
    seq_length = 100,
    batch_size = 64,
    buffer_size = 10000,
    K = len(char_to_ind.get_vocabulary()),
    m = 256,
    epochs=1,
    learning_rate=0.001,
)

In [None]:
training_batches = batch_data(training_text, configs["seq_length"], char_to_ind, configs["batch_size"], configs["buffer_size"])
validation_batches = batch_data(training_text, configs["seq_length"], char_to_ind)

In [None]:
class SpellChecker(tf.keras.callbacks.Callback):

  def on_epoch_end(self, epoch, logs = None):
    start = tf.constant(['.'])
    length = 1000
    print("\nCorrectly spelled (T = 1.0):", correctly_spelled(generate_text_temperature(start, length, self.model, char_to_ind, ind_to_char, T = 1.0), dictionary))
    print("Correctly spelled (T = 0.75):", correctly_spelled(generate_text_temperature(start, length, self.model, char_to_ind, ind_to_char, T = 0.75), dictionary))
    print("Correctly spelled (T = 0.5):", correctly_spelled(generate_text_temperature(start, length, self.model, char_to_ind, ind_to_char, T = 0.5), dictionary))
    print("Correctly spelled (theta = 1.0):", correctly_spelled(generate_text_nucleus(start, length, self.model, char_to_ind, ind_to_char, theta = 1.0), dictionary))
    print("Correctly spelled (theta = 0.75):", correctly_spelled(generate_text_nucleus(start, length, self.model, char_to_ind, ind_to_char, theta = 0.75), dictionary))
    print("Correctly spelled (theta = 0.5):", correctly_spelled(generate_text_nucleus(start, length, self.model, char_to_ind, ind_to_char, theta = 0.5), dictionary))

In [None]:
K = len(char_to_ind.get_vocabulary())
m = 128

wandb.init(
        project="ProjectDD2424",
        config=configs)

config=wandb.config

model = LSTM(K = config.K, m = config.m)
model.compile(optimizer = tf.optimizers.Adam(learning_rate = config.learning_rate), loss = tf.losses.SparseCategoricalCrossentropy(from_logits = True))

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(filepath = checkpoint_prefix, save_weights_only = True)

history = model.fit(training_batches, epochs = config.epochs, callbacks = [checkpoint_callback, SpellChecker(),wandb.keras.WandbCallback()], validation_data = validation_batches)

VBox(children=(Label(value='0.001 MB of 0.001 MB uploaded\r'), FloatProgress(value=1.0, max=1.0)))

Correctly spelled (T = 1.0): 0.07142857142857142
Correctly spelled (T = 0.75): 0.1674641148325359
Correctly spelled (T = 0.5): 0.24705882352941178
Correctly spelled (theta = 1.0): 0.09333333333333334
Correctly spelled (theta = 0.75): 0.09696969696969697


  saving_api.save_model(
[34m[1mwandb[0m: [32m[41mERROR[0m Can't save model in the h5py format. The model will be saved as as an W&B Artifact in the 'tf' format.


Correctly spelled (theta = 0.5): 0.23983739837398374


[34m[1mwandb[0m: Adding directory to artifact (/content/wandb/run-20240514_131749-wnrjtgd1/files/model-best)... Done. 0.0s




In [None]:
start = tf.constant(['.'])
length = 1000
T = 1.0

text = generate_text_temperature(start, length, model, char_to_ind, ind_to_char, T)
print("Correctly spelled:", correctly_spelled(text, dictionary))
print(text)

Correctly spelled: 0.4012345679012346
  Annes, thith ow sormence, ho nous fich hean nt asserte not had hes
inkas tabveriel of ioneto siruly shiokedingace at her foreple ferchore on
ouptains in plowent emung an
mearos. uI hadengechiod,
of
I wad have kenwerst of arl ereable; and showe spalimed to nother virischers rotselp as and inthardy nopes.' 
ould with in ertasilief of the care
to bell besplainened poorlibater, and wowens.  He werowe, al dirsuong Both hid hat has reade thead goonot worled, bewers, and had han that intanitave san which, and you aprares acoust his mastremwhad in tho hof, of tand mosed if biths ang arvingion.  I lest as tund paines.  He arded in eathers mugh uf soof forede, whethel hat anme, to guvencesppe thenkers admemfrees ulaty."

The listoreing the
have on himpanterenres lolked.  Theur, welling teat andith of her thought.
 Loued her ulluleds bot nevert.  He-lade must corestliotss ithead of touyijuth to hid sithe of whthe were, the yfforingeved you gotabreth's rellb