It is highly recommended to use a powerful **GPU**, you can use it for free uploading this notebook to [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb).
<table align="center">
 <td align="center"><a target="_blank" href="https://colab.research.google.com/github/ezponda/intro_deep_learning/blob/main/class/RNN/Seq2seq.ipynb">
        <img src="https://i.ibb.co/2P3SLwK/colab.png"  style="padding-bottom:5px;" />Run in Google Colab</a></td>
  <td align="center"><a target="_blank" href="https://github.com/ezponda/intro_deep_learning/blob/main/class/RNN/Seq2seq.ipynb">
        <img src="https://i.ibb.co/xfJbPmL/github.png"  height="70px" style="padding-bottom:5px;"  />View Source on GitHub</a></td>
</table>

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

## Introduction

In this example, we train a model to learn to add two numbers, provided as strings.

**Example:**

- Input: "535+61"
- Output: "596"

[Notebook from Keras Tutorial](https://keras.io/examples/nlp/addition_rnn/)

## Seq to seq model

Keras provides the `return_state` argument to the LSTM layer that will provide access to the hidden state output (state_h) and the cell state (state_c). Note that `LSTM` has 2 state  tensors, but `GRU`
only has one.

To configure the initial state of the layer, just call the layer with additional
keyword argument `initial_state`.
Note that the shape of the state needs to match the unit size of the layer, like in the
example below.

In [None]:
(timesteps, features, output_timesteps) = (10, 5, 12)

# Encoder
encoder_input = tf.keras.Input(shape=(timesteps, features),
                               name='encoder_input')

# Return states in addition to output
_, state_h, state_c = layers.LSTM(64, return_state=True,
                                       name="encoder")(encoder_input)
# Enncoded vector
encoder_state = [state_h, state_c]

# Decoder
decoder_input = tf.keras.Input(shape=(output_timesteps, 1),
                               name='decoder_input')

# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = layers.LSTM(64, return_sequences=True,
                             name="decoder")(decoder_input,
                                             initial_state=encoder_state)
output = layers.TimeDistributed(layers.Dense(5))(decoder_output)

model = keras.Model([encoder_input, decoder_input], output)
model.summary()

In [None]:
from tensorflow.keras.utils import plot_model
plot_model(model, show_shapes=True)

In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, enc_units, batch_sz, max_len):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz
        self.enc_units = enc_units
        self.max_len = max_len

        ##________ LSTM layer in Encoder ------- ##
        self.lstm_layer = tf.keras.layers.LSTM(self.enc_units,
                                               return_sequences=True,
                                               return_state=True)

    def call(self, encoder_input):
        _, state_h, state_c = self.lstm_layer(encoder_input)
        # Enncoded vector
        encoder_state = [state_h, state_c]
        return encoder_state

class Decoder(tf.keras.Model):
    def __init__(self, dec_units, batch_sz, max_len):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz
        self.dec_units = dec_units
        self.max_len = max_len
        self.decoder_input = tf.zeros((self.batch_sz, self.max_len, 1))

        self.lstm_layer = tf.keras.layers.LSTM(self.dec_units,
                                               return_sequences=True)

    def call(self, encoder_state):
        x = self.lstm_layer(self.decoder_input, initial_state=encoder_state)
        output = layers.TimeDistributed(layers.Dense(5))(x)
        return output
    
encoder_input = tf.keras.Input(shape=(10, 10),
                               name='encoder_input')
encoder = Encoder(10, 10, 5)
encoder_state = encoder(encoder_input)

decoder= Decoder(10, 10, 5)
outputs = decoder(encoder_state)

model = keras.Model(encoder_input, outputs)
model.summary()


You can also use [layers.RepeatVector](https://www.tensorflow.org/api_docs/python/tf/keras/layers/RepeatVector)

In [None]:
model = keras.Sequential()
# "Encode" the input sequence using a LSTM, producing an output of size 128.
model.add(layers.LSTM(128, input_shape=(timesteps, features)))
model.add(layers.RepeatVector(output_timesteps))
model.add(layers.LSTM(128, return_sequences=True))
# Apply a dense layer to the every temporal slice of an input
model.add(layers.Dense(5, activation="softmax"))
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()


In [None]:
plot_model(model, show_shapes=True)

## Generate the data


In [None]:
max_digits = 3
max_int = 10**max_digits - 1
max_len = max_digits + 1 + max_digits
out_max_len = len(str(max_int + max_int))
print('max_digits : {0}, max_int: {1}, max_len: {2}, out_max_len: {3}'.format(
    max_digits, max_int, max_len, out_max_len))
print('max input length from {0}+{0} is {1}'.format(max_int,max_len))
print('max sum: {0}+{0}={1}'.format(max_int,max_int+max_int))

In [None]:
def generate_sample(max_len, max_int, out_max_len):
    a, b = np.random.randint(max_int, size=2)
    sentence = '{0}+{1}'.format(a, b)
    sentence = sentence + ' ' * (max_len - len(sentence))  # padding
    result = str(a + b)
    result = result + ' ' * (out_max_len - len(result))  # padding
    return sentence, result


sentences = []
results = []
seen = set()
print("Generating data...")
while len(sentences) < 50000:
    sentence, result = generate_sample(max_len, max_int, out_max_len)
    if sentence in seen:
        continue
    seen.add(sentence)
    sentences.append(sentence)
    results.append(result)
print("Total sentences:", len(sentences))
print('Some examples:', list(zip(sentences[:3], results[:3])))

## Vectorize the data


In [None]:
chars = "0123456789+ "

char_indices = {c:i for i, c in enumerate(sorted(chars))}
print('char_indices', char_indices)
indices_char = {i:c for c,i in char_indices.items()}
print('indices_char', indices_char)

def vectorize_sentence(sentence, char_indices):
    x = np.zeros((len(sentence), len(char_indices)))
    for i, c in enumerate(list(sentence)):
        x[i, char_indices[c]] = 1
    return x

x = vectorize_sentence('13+11', char_indices)

print('sentence: 13+11')
print('vectorize_sentence inds:', x.argmax(-1))
print('vectorize_sentence :', x)

In [None]:
def vec_to_sentence(x, indices_char):
    return "".join(indices_char[i] for i in x)

def mat_to_sentence(x, indices_char):
    x = x.argmax(axis=-1)
    return "".join(indices_char[i] for i in x)

mat_to_sentence(x, indices_char)

In [None]:
print("Vectorization...")
x = np.zeros((len(sentences), max_len, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), out_max_len, len(chars)), dtype=np.bool)

for i, sentence in enumerate(sentences):
    x[i] = vectorize_sentence(sentence, char_indices)
for i, sentence in enumerate(results):
    y[i] = vectorize_sentence(sentence, char_indices)

# Explicitly set apart 10% for validation data that we never train over.
val_split = int(0.8 * len(x))
test_split = int(0.9 * len(x))

(x_train, y_train) = x[:val_split], y[:val_split]
(x_val, y_val) = x[val_split:test_split], y[val_split:test_split]
(x_test, y_test) = x[test_split:], y[test_split:]

print("Training Data:")
print(x_train.shape)
print(y_train.shape)

print("Validation Data:")
print(x_val.shape)
print(y_val.shape)

print("Test Data:")
print(x_test.shape)
print(y_test.shape)

## Build the model


In [None]:
encoded_dim = 16

In [None]:
# Encoder
encoder_input = tf.keras.Input(
    shape=(max_len, len(chars)), name='encoder_input')

# Return states in addition to output
_, state_h, state_c = layers.LSTM(encoded_dim, return_state=True, name="encoder")(
    encoder_input
)

# Enncoded vector
encoder_state = [state_h, state_c]




# Decoder
decoder_input = tf.keras.Input(
    shape=(out_max_len, 1), name='decoder_input')

# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = layers.LSTM(encoded_dim, return_sequences=True, name="decoder")(
    decoder_input, initial_state=encoder_state
)
output = layers.TimeDistributed(layers.Dense(len(chars), activation='softmax'))(decoder_output)

model = keras.Model([encoder_input, decoder_input], output)
model.summary()

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
## The inputs of the decoder are zeros
decoder_input_data = np.zeros((len(x_train), out_max_len, 1))
decoder_input_data_val = np.zeros((len(x_val), out_max_len, 1))


In [None]:
epochs=30
batch_size=64

for epoch in range(1, epochs):
    print()
    print("Iteration", epoch)
    model.fit(
        [x_train, decoder_input_data],
        y_train,
        batch_size=batch_size,
        epochs=1,
        validation_data=([x_val, decoder_input_data_val], y_val),
    )

    for i in range(5):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], 1*y_val[ind] 
        preds = np.argmax(model.predict([rowx, decoder_input_data_val[[0],:]]), axis=-1).flatten()
        q = mat_to_sentence(rowx[0], indices_char)
        correct = mat_to_sentence(rowy, indices_char)
        guess = vec_to_sentence(preds, indices_char)
        print()
        print("Input: ", q, "Correct output", correct)
        print('Prediction')
        if correct == guess:
            print("☑ " + guess)
        else:
            print("☒ " + guess)

In [None]:
decoder_input_data_test = np.zeros((len(x_test), out_max_len, 1))

results = model.evaluate([x_test, decoder_input_data_test], y_test, verbose=1)
print('Test Loss: {}'.format(results[0]))
print('Test Accuracy: {}'.format(results[1]))

### Question 1: Find a model with test `accuracy> 0.9`


Study the influence of the encoded vector dimension

In [None]:
encoded_dim = ...

In [None]:
## Encoder
encoder_input = tf.keras.Input(
    shape=(max_len, len(chars)), name='encoder_input')

# Return states in addition to output
_, state_h, state_c = layers.LSTM(encoded_dim, return_state=True, name="encoder")(
    encoder_input
)

# Enncoded vector
encoder_state = [state_h, state_c]


# Decoder
decoder_input = tf.keras.Input(
    shape=(out_max_len, 1), name='decoder_input')

decoder_output = layers.LSTM(encoded_dim, return_sequences=True, name="decoder")(
    decoder_input, initial_state=encoder_state
)
output = layers.TimeDistributed(layers.Dense(len(chars), activation='softmax'))(decoder_output)

model = keras.Model([encoder_input, decoder_input], output)

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
## The inputs of the decoder are zeros
decoder_input_data = np.zeros((len(x_train), out_max_len, 1))
decoder_input_data_val = np.zeros((len(x_val), out_max_len, 1))


In [None]:
epochs=30
batch_size=64

for epoch in range(1, epochs):
    print()
    print("Iteration", epoch)
    model.fit(
        [x_train, decoder_input_data],
        y_train,
        batch_size=batch_size,
        epochs=1,
        validation_data=([x_val, decoder_input_data_val], y_val),
    )

    for i in range(5):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], 1*y_val[ind] 
        preds = np.argmax(model.predict([rowx, decoder_input_data_val[[0],:]]), axis=-1).flatten()
        q = mat_to_sentence(rowx[0], indices_char)
        correct = mat_to_sentence(rowy, indices_char)
        guess = vec_to_sentence(preds, indices_char)
        print()
        print("Input: ", q, "Correct output", correct)
        print('Prediction')
        if correct == guess:
            print("☑ " + guess)
        else:
            print("☒ " + guess)

In [None]:
decoder_input_data_test = np.zeros((len(x_test), out_max_len, 1))
results = model.evaluate([x_test, decoder_input_data_test], y_test, verbose=1)
print('Test Loss: {}'.format(results[0]))
print('Test Accuracy: {}'.format(results[1]))

## Practice

Create a similar model for integer division, rounded to 3 decimals:
```python
'999/7' -> '142.714'
'3/4' -> '0.75'
'1/3' -> '0.333'
```

In [None]:
max_digits = 3
max_int = 10**max_digits - 1
max_len = ...#
out_max_len = ...#
print('max_digits : {0}, max_int: {1}, max_len: {2}, out_max_len: {3}'.format(
    max_digits, max_int, max_len, out_max_len))

In [None]:
np.random.randint(max_int)

In [None]:
def generate_sample(max_len, max_int, out_max_len):
    a = np.random.randint(...)
    b = np.random.randint(...) # zero division
    sentence = ...
    sentence = sentence + ' ' * (max_len - len(sentence))  # padding
    result = str(np.round(a / b, 3))
    result = result + ' ' * (out_max_len - len(result))  # padding
    return sentence, result

sentences = []
results = []
seen = set()
print("Generating data...")
while len(sentences) < ...:
    sentence, result = generate_sample(max_len, max_int, out_max_len)
    if sentence in seen:
        continue
    seen.add(sentence)
    sentences.append(sentence)
    results.append(result)
print("Total sentences:", len(sentences))
print('Some examples:', list(zip(sentences[:3], results[:3])))

In [None]:
## Data vectorization

chars = ...

char_indices = {c:i for i, c in enumerate(sorted(chars))}
print('char_indices', char_indices)
indices_char = {i:c for c,i in char_indices.items()}
print('indices_char', indices_char)

def vectorize_sentence(sentence, char_indices):
    x = np.zeros((len(sentence), len(char_indices)))
    for i, c in enumerate(list(sentence)):
        x[i, char_indices[c]] = 1
    return x

x = vectorize_sentence('13/11', char_indices)

print('sentence: 13/11')
print('vectorize_sentence inds:', x.argmax(-1))
print('vectorize_sentence :', x)

In [None]:
def vec_to_sentence(x, indices_char):
    return "".join(indices_char[i] for i in x)

def mat_to_sentence(x, indices_char):
    x = x.argmax(axis=-1)
    return "".join(indices_char[i] for i in x)

mat_to_sentence(x, indices_char)

In [None]:
print("Vectorization...")
x = np.zeros((len(sentences), max_len, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), out_max_len, len(chars)), dtype=np.bool)

for i, sentence in enumerate(sentences):
    x[i] = vectorize_sentence(sentence, char_indices)
for i, sentence in enumerate(results):
    y[i] = vectorize_sentence(sentence, char_indices)

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print("Training Data:")
print(x_train.shape)
print(y_train.shape)

print("Validation Data:")
print(x_val.shape)
print(y_val.shape)

In [None]:
# Encoder
encoder_input = tf.keras.Input(
    shape=(max_len, len(chars)), name='encoder_input')

...

# Enncoded vector
encoder_state = ...

# Decoder
decoder_input = tf.keras.Input(
    shape=(out_max_len, 1), name='decoder_input')

# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = ...

model = keras.Model([encoder_input, decoder_input], output)
model.summary()

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
decoder_input_data = np.zeros((len(x_train), out_max_len, 1))

In [None]:
epochs=30
batch_size=64

for epoch in range(1, epochs):
    print()
    print("Iteration", epoch)
    model.fit(
        [x_train, decoder_input_data],
        y_train,
        batch_size=batch_size,
        epochs=1,
        validation_data=([x_val, decoder_input_data_val], y_val),
    )

    for i in range(5):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], 1*y_val[ind] 
        preds = np.argmax(model.predict([rowx, decoder_input_data_val[[0],:]]), axis=-1).flatten()
        q = mat_to_sentence(rowx[0], indices_char)
        correct = mat_to_sentence(rowy, indices_char)
        guess = vec_to_sentence(preds, indices_char)
        print()
        print("Input: ", q, "Correct output", correct)
        print('Prediction')
        if correct == guess:
            print("☑ " + guess)
        else:
            print("☒ " + guess)