# Case Study 6

# Imports

In [2]:
import datetime
import requests
import os
import random
import shutil
import zipfile
import time
import contextlib
import io
import re
import string
import gc
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import datasets, layers, models, Sequential
from tensorflow.keras.layers import Dense, Embedding, GlobalAveragePooling1D, TextVectorization, StringLookup
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay, f1_score, accuracy_score

# Dataset

## Functions

In [3]:
# This is for cleaning up the text a bit
def preprocess_text(text):

    text = text.replace("Project Gutenberg", "")
    text = text.replace("Gutenberg", "")

    # Remove carriage returns
    text = text.replace("\r", "")

    # fix quotes
    text = text.replace("“", "\"")
    text = text.replace("”", "\"")

    # Replace any capital letter at the start of a word with ^ followed by the lowercase letter
    text = re.sub(r"(?<![a-zA-Z])([A-Z])", lambda match: f"^{match.group(0).lower()}", text)

    # Replace all other capital letters with lowercase
    text = re.sub(r"([A-Z])", lambda match: f"{match.group(0).lower()}", text)

    # Remove duplicate whitespace
    text = re.sub(r"\s+", " ", text)
    text = re.sub(r"\n+", "\n", text)
    text = re.sub(r"\t+", "\t", text)

    # Replace whitespace characters with special words
    # text = re.sub(r"(\t)", r" zztabzz ", text)
    # text = re.sub(r"(\n)", r" zznewlinezz ", text)
    # text = re.sub(r"(\s)", r" zzspacezz ", text)

    # possibly replace all new lines with a space then deal with duplicate whitespaces
    # then find end of sentences via [., ?, !] and add return lines after those

    # Split before and after punctuation
    for punctuation in string.punctuation:
        text = text.replace(punctuation, f" {punctuation} ")

    return text

In [4]:
def postprocess_text(text):

    # Replace special words with whitespace characters
    text = text.replace("zztabzz", "\t")
    text = text.replace("zznewlinezz", "\n")
    text = text.replace("zzspacezz", " ")

    # Remake capital letters at beginning of words
    text = re.sub(r"\^([a-z])", lambda match: f"{match.group(1).upper()}", text)
    text = text.replace("^", "")

    return text

In [5]:
def getMyText():
    file_name = 'austen.txt'
    local_dir = 'Module 6'  # Directory of the file
    local_path = os.path.join(local_dir, file_name)

    try:
        # Ensure the directory exists
        if not os.path.exists(local_dir):
            os.makedirs(local_dir)

        # Check if the file exists locally
        if os.path.exists(local_path):
            print(f"File '{file_name}' found locally. Using it.")
        else:
            print(f"File '{file_name}' not found locally. Need to build it.")\

        # Read the file's contents
        with open(local_path, 'rb') as file:
            text = file.read().decode(encoding='utf-8')

        return preprocess_text(text)

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

## Building Text and Vocab

In [6]:
text = getMyText()
vocab = sorted(set(text))

File 'austen.txt' found locally. Using it.


## Vectorizing

In [7]:
ids_from_chars = keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None)
# use the get_vocabulary() method from StringLookup so that the [UNK] tokens are set the same way ie...
# This returns a tf.RaggedTensor of characters
chars_from_ids = keras.layers.StringLookup(vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [8]:
vis_it = False

# Joining the chars back into strings use tf.strings.reduce_join ie...
if vis_it:
    example_texts = ['abcdefg', 'xyz']
    chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
    ids = ids_from_chars(chars)
    chars_back = chars_from_ids(ids)
    tf.strings.reduce_join(chars_back, axis=-1).numpy()
    print(f"Chars split: {chars}")
    print(f"\nIds from chars: {ids}")
    print(f"\nChars from ids: {chars_back}")

## Creating Training Examples and Targets

In [9]:
seq_length = 128
BATCH_SIZE = 64
BUFFER_SIZE = 10000

In [10]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

def print_divider():
    print("\n=============================")

In [11]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

In [12]:
vis_it = False

if vis_it:
    print(f"All ids: {all_ids}")
    print_divider()
    print("Stream of character indices:")
    for ids in ids_dataset.take(10):
        print(chars_from_ids(ids).numpy().decode('utf-8'))
    print_divider()
    print("Sequence:")
    for seq in sequences.take(1):
        print(chars_from_ids(seq))
    print_divider()
    print("Joint tokens as a string:")
    for seq in sequences.take(5):
        print(text_from_ids(seq).numpy())
    print_divider()
    print(f"Splitup 'Tensorflow': {split_input_target(list("Tensorflow"))}")

In [13]:
dataset = sequences.map(split_input_target)

In [14]:
vis_it = False

if vis_it:
    for input_example, target_example in dataset.take(1):
        print("Input: ", text_from_ids(input_example).numpy())
        print("Target: ", text_from_ids(target_example).numpy())

In [15]:
dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

In [16]:
vis_it = True

if vis_it:
    print(f"Dataset: {dataset}")

Dataset: <_PrefetchDataset element_spec=(TensorSpec(shape=(64, 128), dtype=tf.int64, name=None), TensorSpec(shape=(64, 128), dtype=tf.int64, name=None))>


# Model

In [17]:
vocab_size = len(ids_from_chars.get_vocabulary())
embedding_dim = 256
rnn_units = 1024

In [23]:
class MyModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, rnn_units):
        super().__init__()
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size)

    def call(self, inputs, states=None, return_state=False, training=False):
        print(f"Inputs: {inputs}")
        x = inputs
        x = self.embedding(x, training=training)
        print(f"Inputs embedded: {x}")
        if states is None:
            states = self.gru.get_initial_state(x)
        
        x, states = self.gru(x, initial_state=states, training=training)
        x = self.dense(x, training=training)

        if return_state:
            return x, states
        else:
            return x

In [24]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [25]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

Inputs: [[ 1  1  3 ... 43 49 42]
 [29 42 32 ... 40 53  1]
 [42  1  9 ...  1 47 36]
 ...
 [32  1 42 ...  1 27  1]
 [29 49 42 ... 32  1 29]
 [ 1 43 34 ... 49 46 33]]
Inputs embedded: Tensor("embedding_1_1/GatherV2:0", shape=(64, 128, 256), dtype=float32)
Inputs: [[ 1  1  3 ... 43 49 42]
 [29 42 32 ... 40 53  1]
 [42  1  9 ...  1 47 36]
 ...
 [32  1 42 ...  1 27  1]
 [29 49 42 ... 32  1 29]
 [ 1 43 34 ... 49 46 33]]
Inputs embedded: [[[ 3.4523021e-02  3.3066604e-02  4.9687374e-02 ... -1.7727185e-02
   -1.6640473e-02  1.0234334e-02]
  [ 3.4523021e-02  3.3066604e-02  4.9687374e-02 ... -1.7727185e-02
   -1.6640473e-02  1.0234334e-02]
  [ 8.3964095e-03  2.9478099e-02 -4.1531194e-02 ... -2.9927183e-02
    4.4999074e-02  9.7370148e-03]
  ...
  [ 1.2128472e-02 -1.9998372e-02  1.5060712e-02 ...  3.9490089e-03
    1.6640872e-05 -3.8105916e-02]
  [ 1.6777005e-02 -1.3211727e-02 -2.5632013e-02 ... -2.4492312e-02
   -4.7994804e-02  1.0712635e-02]
  [ 3.2603826e-02  1.8757094e-02 -2.0467043e-03 ... -8.

InvalidArgumentError: Exception encountered when calling MyModel.call().

[1m{{function_node __wrapped__Pack_N_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} Shapes of all inputs must match: values[0].shape = [64,128,256] != values[1].shape = [] [Op:Pack] name: [0m

Arguments received by MyModel.call():
  • inputs=tf.Tensor(shape=(64, 128), dtype=int64)
  • states=None
  • return_state=False
  • training=False