In [None]:
import numpy as np

In [None]:
np.random.seed(42)

In [None]:
w_hh = np.random.standard_normal((3,2))
w_hx = np.random.standard_normal((3,3))
h_t_prev = np.random.standard_normal((2,1))
x_t = np.random.standard_normal((3,1))

In [None]:
stack_1 = np.hstack((w_hh, w_hx))

stack_2 = np.vstack((h_t_prev, x_t))

In [None]:
print(np.matmul(np.hstack((w_hh, w_hx)), np.vstack((h_t_prev, x_t))))
print(np.matmul(stack_1,stack_2))
print(stack_2)
print(np.concatenate([h_t_prev, x_t]))

In [None]:
import numpy as np
from numpy import random
from time import perf_counter
import tensorflow as tf
from tensorflow import keras 
 

In [None]:
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

In [None]:
random.seed(10)                 # Random seed, so your results match ours
emb = 128                       # Embedding size
T = 256                         # Length of sequence
h_dim = 16                      # Hidden state dimension
h_0 = np.zeros((h_dim, 1))     
 
w1 = random.standard_normal((h_dim, emb + h_dim))
w2 = random.standard_normal((h_dim, emb + h_dim))
w3 = random.standard_normal((h_dim, emb + h_dim))

b1 = random.standard_normal((h_dim, 1))
b2 = random.standard_normal((h_dim, 1))
b3 = random.standard_normal((h_dim, 1))

X = random.standard_normal((T, emb, 1))

weights_vanilla = [w1, b1]
weights_GRU = [w1.copy(), w2, w3, b1.copy(), b2, b3]

In [None]:
def forward_RNN(inputs, weights):
    x, ht = inputs
    wh, bh = weights
    
    ht = np.matmul(wh, np.vstack((ht, x)))+bh
    ht = sigmoid(ht)

    y = ht
    #print(ht)
    return y,ht 




In [None]:
def forward_GRU_RNN(inputs, weights):
    x, ht = inputs
    wu,wr,wh, bu,br,bh = weights

    r = sigmoid(np.matmul(wr, np.vstack((ht, x)))+br)
    u = sigmoid(np.matmul(wu, np.vstack((ht, x)))+bu)
    ct = np.tanh(np.matmul(wh, np.concatenate([r * ht, x]))+bh)

    #print(f'{u}\n+\n{r}\n+\n{ct}')
    
    ht = u*ct + (1-u) * ht
    y = ht

    return y,ht 


In [None]:
print(forward_GRU_RNN([X[1], h_0], weights_GRU))

In [None]:
def scan(function, elems, weights, initializer=h_0):
    cur_value = initializer
    ys = []
    for x in elems:
        y,cur_value = function([x,cur_value],weights)
        ys.append(y)
    return ys,cur_value

In [None]:
print(len(X))

In [None]:
ys, h_T = scan(forward_RNN, X, weights_vanilla, h_0)

print(f"Length of ys: {len(ys)}")
print(f"Shape of each y within ys: {ys[0].shape}")
print(f"Shape of h_T: {h_T.shape}")

In [None]:
tic = perf_counter()
ys, h_T = scan(forward_RNN, X, weights_vanilla, h_0)
toc = perf_counter()
RNN_time=(toc-tic)*1000
print (f"It took {RNN_time:.2f}ms to run the forward method for the vanilla RNN.")

In [None]:
tic = perf_counter()
ys, h_T = scan(forward_GRU_RNN, X, weights_GRU, h_0)
toc = perf_counter()
GRU_time=(toc-tic)*1000
print (f"It took {GRU_time:.2f}ms to run the forward method for the GRU.")

In [None]:
model_GRU = tf.keras.Sequential([
    tf.keras.layers.GRU(256, return_sequences=True, name='GRU_1_returns_seq'),
    tf.keras.layers.GRU(128, return_sequences=True, name='GRU_2_returns_seq'),
    tf.keras.layers.GRU(64, name='GRU_3_returns_last_only'),
    tf.keras.layers.Dense(10)
])

In [None]:
try:
    model_GRU.summary()
except Exception as e:
    print(e)

In [None]:
# Remember these three numbers and follow them further through the notebook
batch_size = 60
sequence_length = 50
word_vector_length = 40

input_data = tf.random.normal([batch_size, sequence_length, word_vector_length])

prediction = model_GRU(input_data)

model_GRU.summary()

In [None]:
import os


In [None]:
dirname = 'data/'
filename = 'shakespeare_data.txt'
lines = [] # storing all the lines in a variable. 

counter = 0

with open(os.path.join(dirname, filename)) as files:
    for line in files:        
        pure_line = line.strip()
        if pure_line:
            lines.append(pure_line)
            
n_lines = len(lines)
print(f"Number of lines: {n_lines}")
print("\n".join(lines[506:514]))

In [None]:
def build_vocabulary(lines):
    corpus = ("\n".join(lines))
    vocab = sorted(set(corpus))
    vocab.insert(0,"[UNK]") 
    vocab.insert(1,"") 
    return vocab
vocab = build_vocabulary(lines)
print(len(vocab))
print(" ".join(vocab))


In [None]:
def convert_text_to_tensor(text,vocab):
    chars = tf.strings.unicode_split(text, input_encoding='UTF-8')
    return  tf.keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None)(chars)
tmp = convert_text_to_tensor("abc xyz", vocab)
print(tmp)

In [None]:
def convert_tensor_to_text(tensor, vocab):
    chars_from_ids = tf.keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None, invert=True)
    return tf.strings.reduce_join(chars_from_ids(tensor), axis=-1).numpy()
print(convert_tensor_to_text(tmp, vocab))


In [None]:
train_lines = lines[:-1000]
eval_lines = lines[-1000:]

In [None]:
def test_train_split(sequence):
    return sequence[:-1], sequence[1:]
print(test_train_split(list("Tensorflow")))

In [None]:
def generate_dataset(vocab, lines, seq_length=100, batch_size=64, BUFFER_SIZE = 10000):

    line  = "\n".join(lines)
    all_ids = convert_text_to_tensor(line, vocab)
    ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
    data_generator = ids_dataset.batch(seq_length+1, drop_remainder=True)
    dataset_xy = data_generator.map(test_train_split)
    dataset = (                                   
        dataset_xy                                
        .shuffle(BUFFER_SIZE)
        .batch(batch_size, drop_remainder=True)
        .prefetch(tf.data.experimental.AUTOTUNE)  
        )            
    return dataset


In [None]:
BATCH_SIZE = 64
dataset = generate_dataset(vocab, train_lines)

In [None]:
def create_gru_model(vocab_size, embedding_dim, rnn_units):
    model = tf.keras.Sequential([
        tf.keras.layers.Embedding(vocab_size, embedding_dim, mask_zero=True),
        tf.keras.layers.GRU(rnn_units, return_sequences=True),
        tf.keras.layers.Dense(vocab_size, activation=tf.nn.log_softmax)
    ])
    return model

# Usage:
vocab_size = 82  # Adjust as needed
embedding_dim = 256
rnn_units = 512

model = create_gru_model(vocab_size, embedding_dim, rnn_units)

In [None]:
model.build(input_shape=(None, 100))
model.summary()


In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    print("Input: ", input_example_batch[0].numpy()) # Lets use only the first sequence on the batch
    example_batch_predictions = model(tf.constant([input_example_batch[0].numpy()]))
    print("\n",example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
example_batch_predictions[0][99].numpy()

In [None]:
sampled_indices = tf.math.argmax(example_batch_predictions[0], axis=1)
print(sampled_indices.numpy())

In [None]:
print("Input:\n", convert_tensor_to_text(input_example_batch[0], vocab))
print()
print("Next Char Predictions:\n", convert_tensor_to_text(sampled_indices, vocab))

In [None]:
def compile_model(model):
    loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
    opt = tf.keras.optimizers.Adam(learning_rate=0.00125)
    model.compile(optimizer=opt, loss=loss)
    return model

In [None]:
gpus = tf.config.list_physical_devices('GPU')
print(gpus)


In [None]:
Epochs = 30
model = compile_model(model)
history = model.fit(dataset, epochs = Epochs)

In [None]:
model.save_weights("saved.weights.h5")

In [None]:
model.load_weights("saved.weights.h5")

In [None]:
model.summary()

In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    print("Input: ", input_example_batch[0].numpy()) # Lets use only the first sequence on the batch
    example_batch_predictions = model(tf.constant([input_example_batch[0].numpy()]))
    print("\n",example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
sampled_indices = tf.math.argmax(example_batch_predictions[0], axis=1)
print(sampled_indices.numpy())

In [None]:
print("Input:\n", convert_tensor_to_text(input_example_batch[0], vocab))
print()
print("Next Char Predictions:\n", convert_tensor_to_text(sampled_indices, vocab))