<a href="https://colab.research.google.com/github/MarcPartensky/Neural-Networks/blob/master/ialab_addition_marc_partensky.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [154]:
from tensorflow.keras.layers import LSTM, RepeatVector, Dense
from tensorflow.keras import Sequential
import numpy as np

DATA_SIZE = 50000
TRAINING_SET_PERCENTAGE = 95/100
DIGITS = 3
CHARS = "0123456789+ "
INPUT_SIZE = DIGITS + 1 + DIGITS
OUTPUT_SIZE = DIGITS + 1
LAYERS = 2
LAYER_SIZE = 2**7
EPOCHS = 50
BATCH_SIZE = 50
TEST_SAMPLE_SIZE = 10
LOSS = "categorical_crossentropy"
OPTIMIZER = "adam"
METRICS = ["accuracy"]

In [155]:
# Create a one hot encoder

class OneHotEncoder:

  def __init__(self, chars:str):
    """Create a one hot encoder using its size and the characters encoded."""
    self.chars = chars

  def encode(self, string:str, size:int) -> np.matrix:
    """Encode a string into a matrix of one hot encoded characters."""
    matrix = np.zeros((size, len(self.chars)))
    for i, char in enumerate(string):
      matrix[i, self.chars.index(char)] = 1
    return matrix

  def decode(self, matrix:np.ndarray) -> str:
    """Decode a matrix of one hot encoded characters into its string."""
    return "".join(self.chars[i] for i in matrix)

one_hot_encoder = OneHotEncoder(CHARS)

In [156]:
# Generate the questions and the answers

f = lambda: int(
    "".join(
        np.random.choice(list(map(str, range(10))))
        for i in range(np.random.randint(DIGITS)+1)
    )
)

def generate(chars:str, data_size:int, digits:int, input_size:int, output_size:int) -> tuple:
  """Generate the questions and answers to build data for the model."""
  questions = []
  answers = []
  seen = set()
  while len(questions) < data_size:
    a, b = f(), f()
    key = tuple(sorted((a, b))) 

    if key in seen:
      continue
    seen.add(key)
    question = f"{a}+{b}"
    question += " " * (input_size - len(question))
    answer = str(a+b)
    answer += " " * (output_size - len(answer))
    questions.append(question)
    answers.append(answer)
    # Show progression since this process can take some time
    if len(questions) % (data_size//10) == 0:
      print(int(len(questions)/data_size*100), '%')
  return (questions, answers)

questions, answers = generate(CHARS, DATA_SIZE, DIGITS, INPUT_SIZE, OUTPUT_SIZE)

10 %
20 %
30 %
40 %
50 %
60 %
70 %
80 %
90 %
100 %


In [157]:
i = np.random.randint(DATA_SIZE)
print(i)
print(questions[i], '=', answers[i])

8061
9+610   = 619 


In [158]:
# Create the training set and the validation set given the questions and answers

def one_hot_encode(
    questions:list,
    answers:list,
    input_size:int,
    output_size:int,
    chars:str
  ) -> tuple:
  """One hot encode the questions and the answers to make the data."""
  x = np.zeros((len(questions), input_size, len(chars)), dtype=np.bool)
  y = np.zeros((len(questions), output_size, len(chars)), dtype=np.bool)
  for i, question in enumerate(questions):
    x[i] = one_hot_encoder.encode(question, input_size)
  for i, answer in enumerate(answers):
    y[i] = one_hot_encoder.encode(answer, output_size)
  return (x, y)

x, y = one_hot_encode(questions, answers, INPUT_SIZE, OUTPUT_SIZE, CHARS)

def shuffle(x: np.ndarray, y: np.ndarray) -> tuple:
  """Shuffle the data."""
  indices = np.arange(DATA_SIZE)
  np.random.shuffle(indices)
  return (x[indices], y[indices])

x, y = shuffle(x, y)

def split_training_and_validation(
      x: np.ndarray, 
      y: np.ndarray, 
      training_set_percentage: float
    ) -> tuple:
  """Split the data between the training set and the validation set."""
  training_size = int(training_set_percentage * len(x))
  return x[:training_size], y[:training_size], x[training_size:], y[training_size:]

x_train, y_train, x_validation, y_validation = split_training_and_validation(x, y, TRAINING_SET_PERCENTAGE)

print("Training data:", x_train.shape, y_train.shape)
print("Validation data:", x_validation.shape, y_validation.shape)

Training data: (47500, 7, 12) (47500, 4, 12)
Validation data: (2500, 7, 12) (2500, 4, 12)


In [159]:
# Build the model
def build(
    input_size:int,
    output_size:int,
    chars:str,
    layers:int,
    layer_size:int,
    loss:str,
    optimizer:str,
    metrics:list
  ) -> Sequential:
  """Build a LSTM sequential model."""
  model = Sequential()
  model.add(LSTM(layer_size, input_shape=(input_size, len(chars))))
  model.add(RepeatVector(output_size))
  for layer in range(layers):
    model.add(LSTM(layer_size, return_sequences=True))
  model.add(Dense(len(chars), activation="softmax"))
  model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
  return model

model = build(INPUT_SIZE, OUTPUT_SIZE, CHARS, LAYERS, LAYER_SIZE, LOSS, OPTIMIZER, METRICS)
model.summary()


Model: "sequential_6"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_26 (LSTM)               (None, 128)               72192     
_________________________________________________________________
repeat_vector_8 (RepeatVecto (None, 4, 128)            0         
_________________________________________________________________
lstm_27 (LSTM)               (None, 4, 128)            131584    
_________________________________________________________________
lstm_28 (LSTM)               (None, 4, 128)            131584    
_________________________________________________________________
dense_6 (Dense)              (None, 4, 12)             1548      
Total params: 336,908
Trainable params: 336,908
Non-trainable params: 0
_________________________________________________________________


In [None]:
# Training the model

def train(
    model:Sequential,
    x_train:np.ndarray,
    y_train:np.ndarray,
    x_validation:np.ndarray,
    y_validation:np.ndarray,
    epochs:int,
    batch_size:int,
    test_sample_size:int
  ):
  """Train the model and print progress over validation data."""
  for epoch in range(1, epochs):
    print(f"Iteration {epoch}")
    # Train the model for one iteration
    model.fit(
        x=x_train,
        y=y_train,
        batch_size=batch_size,
        epochs=1,
        validation_data=(x_validation, y_validation)
    )
    # Then test it on the validation set
    for i in range(test_sample_size):
      index = np.random.randint(0, len(x_validation))
      x_random_test = x_validation[np.array([index])]
      y_random_test = y_validation[np.array([index])]

      predictions = np.argmax(model.predict(x_random_test), axis=-1)
      x_random_test = np.argmax(x_random_test, axis=-1)
      y_random_test = np.argmax(y_random_test, axis=-1)

      guess = one_hot_encoder.decode(predictions[0])
      question = one_hot_encoder.decode(x_random_test[0])
      answer = one_hot_encoder.decode(y_random_test[0])

      print(f"{question}={guess}", answer==guess)

train(model, x_train, y_train, x_validation, y_validation, EPOCHS, BATCH_SIZE, TEST_SAMPLE_SIZE)


Iteration 1
7+919  =999  False
3+131  =22   False
703+78 =909  False
458+493=102  False
58+43  =22   False
456+556=102  False
471+264=702  False
695+329=101  False
22+150 =227  False
575+130=902  False
Iteration 2
998+153=1254 False
471+25 =424  False
251+53 =326  False
512+196=726  False
306+303=726  False
79+440 =424  False
42+906 =901  False
63+296 =324  False
77+34  =114  False
116+975=1114 False
Iteration 3
699+647=1368 False
491+243=798  False
63+626 =688  False
394+669=1026 False
19+530 =588  False
932+56 =990  False
86+377 =428  False
21+595 =628  False
904+995=1988 False
7+94   =101  True
Iteration 4
82+581 =659  False
80+641 =758  False
120+88 =219  False
4+149  =158  False
904+22 =959  False
84+612 =699  False
313+63 =350  False
591+4  =591  False
29+76  =101  False
488+48 =511  False
Iteration 5
674+37 =722  False
138+808=995  False
63+296 =365  False
7+519  =525  False
312+34 =345  False
726+817=1505 False
904+632=1549 False
9+503  =510  False
614+40 =652  False
165+9  =16