In [4]:
!pip install tensorflow_addons

Collecting tensorflow_addons
  Downloading tensorflow_addons-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (611 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m611.8/611.8 kB[0m [31m8.6 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow_addons)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow_addons
Successfully installed tensorflow_addons-0.23.0 typeguard-2.13.3


In [5]:
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import numpy as np
import secrets
import time
import datetime
import re


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



In [6]:
from google.colab import drive
drive.mount("/content/gdrive")
drive_path = "gdrive/MyDrive/MachineLearning/HandsOnMachineLearning/chapter16"

Mounted at /content/gdrive


# 8.

In [None]:
class Node():
    def __init__(self, letter, next_nodes=None):
        self.letter = letter
        self.next_nodes = next_nodes

    def pick_next_node(self):
        return secrets.choice(self.next_nodes)

    def set_next_nodes(self, next_nodes):
        self.next_nodes = next_nodes


class ReberString():
    def __init__(self, postlayers):
        self.end = Node("E")
        layer3_V = Node("V")
        layer3_S = Node("S")
        layer3_X = Node("X")
        layer3_P = Node("P")
        layer2_V = Node("V")
        layer2_T = Node("T")
        layer2_X = Node("X")
        layer2_S = Node("S")
        layer1_P = Node("P")
        layer1_T = Node("T")
        self.start = Node("B")

        self.end.set_next_nodes(postlayers)
        layer3_V.set_next_nodes([end])
        layer3_S.set_next_nodes([end])
        layer3_X.set_next_nodes([layer2_T, layer2_V])
        layer3_P.set_next_nodes([layer3_X, layer3_S])

        layer2_V.set_next_nodes([layer3_P, layer3_V])
        layer2_T.set_next_nodes([layer2_T, layer2_V])
        layer2_X.set_next_nodes([layer3_X, layer3_S])
        layer2_S.set_next_nodes([layer2_S, layer2_X])

        layer1_P.set_next_nodes([layer2_T, layer2_V])
        layer1_T.set_next_nodes([layer2_S, layer2_X])
        self.start.set_next_nodes([layer1_P, layer1_T])


end = Node("E")
postlayer_P = Node("P", [end])
postlayer_T = Node("T", [end])
string_P = ReberString(postlayer_P)
string_T = ReberString(postlayer_T)
prelayer_P = Node("P", [string_P.start])
prelayer_T = Node("T", [string_T.start])
start = Node("B", [prelayer_T, prelayer_P])

unique_letters = ["B", "E", "P", "S", "T", "V", "X"]

In [None]:
def generate_valid_sequence(start, join=True):
    letters = []
    cur_node = start

    while cur_node.next_nodes is not None:
        letters.append(cur_node.letter)
        cur_node = cur_node.pick_next_node()

    if join:
        valid_string = "".join(letters)
        return valid_string
    return letters


def one_hot_encoding(string, unique_letters):
    return [unique_letters.index(c) for c in string]


def create_valid_data(length, start, unique_letters):
    valid_strings = []
    for _ in range(length//2):
        valid_string = generate_valid_sequence(start)
        valid_strings.append(one_hot_encoding(valid_string, unique_letters))
    return valid_strings


def create_invalid_data(length, start, unique_letters, n_invalid_letters=1):
    invalid_strings = []
    for _ in range(length//2):
        letters = generate_valid_sequence(start, False)
        indices = secrets.SystemRandom().sample(range(len(letters)), n_invalid_letters)
        for i in indices:
            cur_letter = letters[i]
            new_letter = secrets.choice([letter for letter in unique_letters
                                        if letter != cur_letter])
            letters[i] = new_letter
        new_string = "".join(letters)
        invalid_strings.append(one_hot_encoding(new_string, unique_letters))
    return invalid_strings



def create_dataset(length, start, unique_letters, training=False):
    valid_data = create_valid_data(length, start, unique_letters)
    invalid_data = create_invalid_data(length, start, unique_letters, 1)
    data = [*valid_data, *invalid_data]
    X = tf.ragged.constant(data, ragged_rank=1)
    y = np.array([[1.] for _ in range(len(valid_data))] +
                 [[0.] for _ in range(len(invalid_data))])
    return X, y


X_train, y_train = create_dataset(7500, start, unique_letters, True)
X_valid, y_valid = create_dataset(1500, start, unique_letters)
X_test, y_test = create_dataset(1000, start, unique_letters)

In [None]:
filepath = time.strftime(f"{drive_path}/models/reberstring_%Y_%m_%d-%H_%M_%S")
checkpoint_cb = keras.callbacks.ModelCheckpoint(
    filepath,
    monitor='val_loss',
    verbose=0,
    save_best_only=True,
    save_weights_only=False,
    mode='auto',
    save_freq='epoch',
    initial_value_threshold=None
)
embedding_size = 5

model = keras.models.Sequential([
    layers.InputLayer(input_shape=[None], dtype=tf.int32, ragged=True),
    layers.Embedding(input_dim=len(unique_letters), output_dim=embedding_size),
    layers.GRU(30),
    layers.Dense(1, activation="sigmoid")
])
optimizer = keras.optimizers.SGD(learning_rate=0.02, momentum=0.95, nesterov=True)
model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"])
model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid),
          callbacks=[checkpoint_cb])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x78700f0a76a0>

# 9.

In [7]:
YEARS = list(range(1990, 2031))
MONTHS = list(range(1, 13))
DAYS = list(range(1, 32))



def create_dates(length):
    contexts = []
    targets = []

    i = 0
    while i < length:
        try:
            year = secrets.choice(YEARS)
            month = secrets.choice(MONTHS)
            day = secrets.choice(DAYS)
            date = datetime.datetime(year, month, day)
            context = date.strftime("%B %d, %Y").replace(' 0', ' ')
            target = date.strftime("%Y-%m-%d")
            contexts.append(context)
            targets.append(target)
            i += 1
        except ValueError:
            continue

    return contexts, targets

In [18]:
def standardize(input_data):
    lowercase = tf.strings.lower(input_data)
    stripped_html = tf.strings.regex_replace(lowercase, '<br />', ' ')
    stripped_punctuation = tf.strings.regex_replace(stripped_html, '[%s]' % re.escape('!"#$%&()*+,./:;<=>?@[\\]^_`{|}~'), '')
    seperate_dash = tf.strings.regex_replace(stripped_punctuation, "-", " - ")
    return seperate_dash


MONTHS_TEXT = [
    "January", "February", "March", "April", "May", "June",
    "July", "August", "September", "October", "November", "December"
]
DAYS_WITH_ZERO = ["01", "02", "03", "04", "05", "06", "07", "08", "09"]

vocab_size = 100
embed_size = 5

encoder_vectorize_layer = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=embed_size
)
encoder_vectorize_layer.adapt([
    *MONTHS_TEXT, *map(str, DAYS), *map(str, YEARS)
])

decoder_vectorize_layer = layers.TextVectorization(
    max_tokens=vocab_size,
    standardize=standardize,
    output_mode="int",
    output_sequence_length=embed_size
)
decoder_vectorize_layer.adapt([
    *map(str, list(range(10, 32))), *map(str, YEARS), "-", *DAYS_WITH_ZERO
])



In [19]:
print(encoder_vectorize_layer("April 22, 2019"))
print(decoder_vectorize_layer("2019-22-04"))

tf.Tensor([13 30 43  0  0], shape=(5,), dtype=int64)
tf.Tensor([24 74 11 74 70], shape=(5,), dtype=int64)


In [20]:
X_train, y_train = create_dates(7500)
X_valid, y_valid = create_dates(1500)
X_test, y_test = create_dates(1000)

In [21]:
X_train = encoder_vectorize_layer(X_train)
y_train = decoder_vectorize_layer(y_train)
X_valid = encoder_vectorize_layer(X_valid)
y_valid = decoder_vectorize_layer(y_valid)
X_test = encoder_vectorize_layer(X_test)
#y_test = decoder_vectorize_layer(y_test)

In [22]:
encoder_inputs = layers.Input(shape=[None], dtype=np.int32)
decoder_inputs = layers.Input(shape=[None], dtype=np.int32)
sequence_lengths = layers.Input(shape=[], dtype=np.int32)

embeddings = keras.layers.Embedding(vocab_size, embed_size)
encoder_embeddings = embeddings(encoder_inputs)
decoder_embeddings = embeddings(decoder_inputs)

encoder = layers.LSTM(512, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_embeddings)
encoder_state = [state_h, state_c]

sampler = tfa.seq2seq.sampler.TrainingSampler()
decoder_cell = layers.LSTMCell(512)
output_layer = layers.Dense(vocab_size)

decoder = tfa.seq2seq.basic_decoder.BasicDecoder(decoder_cell, sampler,
                                                 output_layer=output_layer)
final_outputs, final_state, final_sequence_lengths = decoder(
    decoder_embeddings, initial_state=encoder_state,
    sequence_length=sequence_lengths
)
Y_proba = tf.nn.softmax(final_outputs.rnn_output)
model = keras.Model(inputs=[encoder_inputs, decoder_inputs, sequence_lengths],
                    outputs=[Y_proba])
model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

In [23]:
sequence_lengths = np.array([len(seq) for seq in X_train])
model.fit([X_train, y_train, sequence_lengths], y_train, epochs=15, validation_data=(X_valid, y_valid))

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.src.callbacks.History at 0x7e992c6ec340>

In [33]:
inference_sampler = tfa.seq2seq.sampler.GreedyEmbeddingSampler(
    embedding_fn=embeddings)
inference_decoder = tfa.seq2seq.basic_decoder.BasicDecoder(
    decoder_cell, inference_sampler, output_layer=output_layer,
    maximum_iterations=embed_size)
batch_size = tf.shape(encoder_inputs)[:1]
start_tokens = tf.fill(dims=batch_size, value=101)
final_outputs, final_state, final_sequence_lengths = inference_decoder(
    start_tokens,
    initial_state=encoder_state,
    start_tokens=start_tokens,
    end_token=0
)

inference_model = keras.models.Model(inputs=[encoder_inputs],
                                     outputs=[final_outputs.sample_id])

In [39]:
predictions = inference_model.predict(X_test)[0]
vocab = decoder_vectorize_layer.get_vocabulary()
print(" ".join([vocab[int(each)] for each in tf.squeeze(predictions)]))
print(y_test[0])

2014 - - - 09
2014-02-08
