# Libraries

In [179]:
import re
import numpy as np
import random
import sys
from keras.models import Sequential
from keras.layers import Input, LSTM, Dense, Embedding, Bidirectional, Dropout
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical

---

# Dataset

**DATA CONTROLLERS**

In [180]:
# filters
class Dataset:
    # constructor
    def __init__(self, text:list[str]):
        self.text:list[str] = text  

    # methods
    # -> lower: convert text to lowercase
    def Filter(self):
        # convert text to lowercase
        self.text = [t.lower() for t in self.text]
        # remove punctuation except tabs and alphabetics and numbers
        self.text = [re.sub(r'[^a-z0-9\s\t]', '', t) for t in self.text]

    # -> chars: remove punctuation except alphabetics and return sorted list of chars_set
    def Chars(self) -> list[str]:
        # remove punctuation expect alphabetics and numbers
        text = [re.sub(r'[^a-z0-9]', '', t) for t in self.text]

        # return sorted list of chars_set
        chars = [' ']
        for c in ''.join(text):
            chars.append(c)
            
        vocab = sorted(list(set(chars)))
        return vocab


**VOCAB**

In [181]:
class Vocab:
    # constructor
    def __init__(self, chars_set:list[str]):
        # attributes
        # -> vocab: represents all the chars in the dataset
        # -> size: represents the size of the vocab
        self.vocab:list[str] = chars_set
        self.size:int        = len(self.vocab)

        # -> mapper: maps each char to an index in the vocab and vice versa
        self.chars_ix:dict[str,int] = {c:i for i, c in enumerate(self.vocab)}
        self.ix_chars:dict[int,str] = {i:c for i, c in enumerate(self.vocab)}

    # methods
    # -> encode: encodes a char into an index
    # -> decode: decodes an index into a char
    def encode(self, char:str) -> int:
        return self.chars_ix[char]
    def decode(self, ix:int) -> str:
        return self.ix_chars[ix]

    # -> hot_encode: encodes a char into a one-hot vector with size of the vocab
    #                (matrix of zeros with a 1 in the index of the char)
    # -> hot_decode: decodes a one-hot vector into a char
    def hot_encode(self, char:str) -> np.ndarray:
        # create matrix of zeros with shape (vocab_size)
        arr = np.zeros(self.size)

        # set the ix of the char
        arr[self.encode(char)] = 1
        return arr

    def hot_decode(self, data:np.ndarray) -> str:
        # get the index of the max value
        max_arg_ix = np.argmax(data)

        # get the char from the index
        return self.decode(max_arg_ix)

In [182]:
class Phrase:
    # constructor
    def __init__(self, vocab:Vocab, max_len:int):
        # attributes
        # -> vocab: represents the vocab used to encode and decode the phrase
        # -> max_len: represents the max length of the phrase (this for standardization)
        self.vocab:Vocab = vocab
        self.max_len:int = max_len

    # methods
    # -> encode: encodes a phrase into a matrix of one-hot vectors
    # -> decode: decodes a matrix of one-hot vectors into a phrase
    def encode(self, phrase:str) -> np.ndarray:
        # create matrix of zeros with shape (max_len, vocab_size)
        arr = np.zeros((self.max_len, self.vocab.size))

        # iterate over the phrase
        for i, c in enumerate(phrase):
            if i >= self.max_len:
                break

            # encode the char into a one-hot vector
            char = self.vocab.hot_encode(c)

            # set the one-hot vector in the matrix
            arr[i] = char

        return arr

    def decode(self, data:np.ndarray) -> str:
        # create an empty string
        phrase = ''

        # iterate over the matrix
        for v in data:
            # decode the one-hot vector into a char
            char = self.vocab.hot_decode(v)

            # add the char to the phrase
            phrase += char

        return phrase

**INITIALIZATION**

In [183]:
# CONFIG
PHRASE_MAX_LEN = 50

# _________________________________________________________________
# Dataset
with open("dialogs.txt", "r") as f:
    text = f.readlines()

dataset = Dataset(text)
dataset.Filter()


# _________________________________________________________________
# App
# -> vocab: controller that handles the encoding and decoding of chars
# -> phraser: controller that handles the encoding and decoding of phrases
chars_set = dataset.Chars()

vocab = Vocab(chars_set)
phraser = Phrase(vocab, PHRASE_MAX_LEN)

In [184]:
dataset.text[0]

'hi how are you doing\tim fine how about yourself\n'

In [185]:
# hot encode a simple phrase (string -> list of chars -> list of ix -> matrix)
phrase = "hello"

# create a phrase encoded
arr = phraser.encode(phrase)
arr

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

In [186]:
# hot decode a simple phrase (matrix -> list of ix -> list of chars -> string)
phrase_decoded = phraser.decode(arr)
phrase_decoded

'hello                                             '

---

# Pre-Processing

In [190]:
# ________________________________________________________________________________
# Generate sequences based on question/answer pairs in the dataset (using phraser)
# - x: input sequence (question)
# - y: output sequence (answer)
questions = np.zeros((len(dataset.text), phraser.max_len, vocab.size), dtype="int32")
answers = np.zeros((len(dataset.text), phraser.max_len, vocab.size), dtype="int32")


for i, phrase in enumerate(dataset.text):
    # split line into question and answer
    q, a = phrase.strip().split("\t")

    # encode question and answer (from string to list of integers)
    q_encoded = phraser.encode(q)
    a_encoded = phraser.encode(a)

    # add the encoded question and answer to the dataset
    questions[i] = q_encoded
    answers[i]   = a_encoded

In [192]:
questions.shape, answers.shape

((3725, 50, 37), (3725, 50, 37))

In [193]:
# showcase
print("Encoded question: ", questions[0])
print("Encoded answer: ", answers[0])

print("Decoded question: ", phraser.decode(questions[0]))
print("Decoded answer: ", phraser.decode(answers[0]))


Encoded question:  [[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [1 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]
Encoded answer:  [[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [1 0 0 ... 0 0 0]
 ...
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]]
Decoded question:  hi how are you doing                              
Decoded answer:  im fine how about yourself                        


---

In [23]:
# Parameters for the model and dataset.
class Config:
    # constructor
    def __init__(self):
        self.training_size:int = 50000
        self.digits:int = 5
        self.hidden_size:int = 128
        self.batch_size:int = 128

config = Config()
config.training_size = 10
config.digits = 5
config.hidden_size = 128
config.batch_size = 128

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
maxlen = config.digits + 1 + config.digits

# All the numbers, plus sign and space for padding.
chars = '0123456789+- '

questions = []
expected = []
seen = set()
print('Generating data...')
while len(questions) < config.training_size:
    f = lambda: int(''.join(np.random.choice(list('0123456789'))
                    for i in range(np.random.randint(1, config.digits + 1))))
    a, b = f(), f()
    # Skip any addition questions we've already seen
    # Also skip any such that x+Y == Y+x (hence the sorting).
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)
    # Pad the data with spaces such that it is always MAXLEN.
    q = '{}-{}'.format(a, b)
    query = q + ' ' * (maxlen - len(q))
    ans = str(a - b)
    # Answers can be of maximum size DIGITS + 1.
    ans += ' ' * (config.digits + 1 - len(ans))

    questions.append(query)
    expected.append(ans)
    
print('Total addition questions:', len(questions))

print('Vectorization...')
x = np.zeros((len(questions), maxlen, len(chars)))
y = np.zeros((len(questions), config.digits + 1, len(chars)))

print("x -> ", x)
print("y -> ", y)

# for i, sentence in enumerate(questions):
#     x[i] = ctable.encode(sentence, maxlen)
# for i, sentence in enumerate(expected):
#     y[i] = ctable.encode(sentence, config.digits + 1)

Generating data...
Total addition questions: 10
Vectorization...
x ->  [[[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 ...

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 

In [31]:
z = np.zeros((3, 4, 5))
z

array([[[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]],

       [[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]],

       [[0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0.]]])

# Model

**SETS**

In [51]:
# train set
split = int(len(x_format) * 0.8)
x_train, x_test = x_format[:split], x_format[split:]
y_train, y_test = y_format[:split], y_format[split:]

In [54]:
print("x shape (train, test): ", x_train.shape, x_test.shape)
print("y shape (train, test): ", y_train.shape, y_test.shape)

x shape (train, test):  (2980, 50, 55) (745, 50, 55)
y shape (train, test):  (2980, 50, 55) (745, 50, 55)


**MODEL**

In [52]:
# create model
model = Sequential()
model.add(LSTM(128, input_shape=(MAX_LEN, VOCAB_SIZE)))
model.add(Dense(VOCAB_SIZE, activation="softmax"))

model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

**TRAINING**

In [None]:
# train
model.fit(x_train, y_train, epochs=10, batch_size=32, validation_data=(x_test, y_test))