# Assignment 3 - A
This task aims at implementating sequence to sequence learning for performing addition.



In [0]:
import tensorflow as tf

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
import numpy as np
from six.moves import range

In [0]:
class CharacterTable(object):
    """Given a set of characters:
    + Encode them to a one-hot integer representation
    + Decode the one-hot or integer representation to their character output
    + Decode a vector of probabilities to their character output
    """
    def __init__(self, chars):
        """Initialize character table.

        # Arguments
            chars: Characters that can appear in the input.
        """
        self.chars = sorted(set(chars)) # ['+', '1', '2', '3', '4', '5', '6', '7', '8', '9']
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars)) 
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars)) 

    def encode(self, C, num_rows):
        """One-hot encode given string C.

        # Arguments
            C: string, to be encoded. # num_row是C的最大长度
            num_rows: Number of rows in the returned one-hot encoding. This is
                used to keep the # of rows for each data the same. 
        """
        x = np.zeros((num_rows, len(self.chars)))
        for i, c in enumerate(C):
            x[i, self.char_indices[c]] = 1
        return x

    def decode(self, x, calc_argmax=True):
        """Decode the given vector or 2D array to their character output.

        # Arguments
            x: A vector or a 2D array of probabilities or one-hot representations;
                or a vector of character indices (used with `calc_argmax=False`).
            calc_argmax: Whether to find the character index with maximum
                probability, defaults to `True`.
        """
        if calc_argmax:
            x = x.argmax(axis=-1) # from one-hot to list of indices
        return ''.join(self.indices_char[x] for x in x)

class colors:
    ok = '\033[92m'
    fail = '\033[91m'
    close = '\033[0m'

## Task 1

In [30]:
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = False

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS

# All the numbers, plus sign and space for padding. TODO: DELETE SPACE?
chars = '0123456789+' # TODO: removed space, handle + somehow?
ctable = CharacterTable(chars)

questions = []
expected = []
seen = set()
while len(questions) < TRAINING_SIZE:
    f = lambda: int(''.join(np.random.choice(list('0123456789'))
                    for i in range(np.random.randint(1, DIGITS + 1))))
    a, b = f(), f()
    # Skip any addition questions we've already seen
    # Also skip any such that x+Y == Y+x (hence the sorting).
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)
    
    # Pad everything to maximum length with 0s
    # Task 2 and 4 do have a nice SPACE_PADDING flag
    pad = '0'
    astr, bstr = str(a), str(b)
    astr = pad * (DIGITS - len(astr)) + astr
    bstr = pad * (DIGITS - len(bstr)) + bstr
    query = f"{astr}+{bstr}"
    ans = str(a + b)
    ans = pad * (DIGITS + 1 - len(ans)) + ans

    if len(seen) < 5: print(query, ans)

    if REVERSE:
        query = query[::-1]
    questions.append(query)
    expected.append(ans)

# Vectorize
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool) # n * x * d
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool) # n * y * d
for i, sentence in enumerate(questions):
    x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
    y[i] = ctable.encode(sentence, DIGITS + 1)

# Shuffle (x, y)
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

037+762 0799
092+016 0108
053+008 0061
010+032 0042
Training Data:
(45000, 7, 11)
(45000, 4, 11)
Validation Data:
(5000, 7, 11)
(5000, 4, 11)


In [31]:
RNN = layers.SimpleRNN
HIDDEN_SIZE = 128 # outoup of the encoder
BATCH_SIZE = 64
LAYERS = 2

model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last output of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
    # By setting return_sequences to True, return not only the last output but
    # all the outputs so far in the form of (num_samples, timesteps,
    # output_dim). This is necessary as TimeDistributed in the below expects
    # the first dimension to be the timesteps.
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))

# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
model.summary()

Model: "sequential_19"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
simple_rnn_27 (SimpleRNN)    (None, 128)               17920     
_________________________________________________________________
repeat_vector_19 (RepeatVect (None, 4, 128)            0         
_________________________________________________________________
simple_rnn_28 (SimpleRNN)    (None, 4, 128)            32896     
_________________________________________________________________
simple_rnn_29 (SimpleRNN)    (None, 4, 128)            32896     
_________________________________________________________________
time_distributed_19 (TimeDis (None, 4, 11)             1419      
Total params: 85,131
Trainable params: 85,131
Non-trainable params: 0
_________________________________________________________________


In [32]:
# Train the model each generation and show predictions against the validation
# dataset.
for iteration in range(1, 30):
    print()
    print('-' * 50)
    print('Iteration', iteration)
    model.fit(x_train, y_train,
              batch_size=BATCH_SIZE,
              epochs=1,
              validation_data=(x_val, y_val))
    # Select 10 samples from the validation set at random so we can visualize
    # errors.
    for i in range(10):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
        preds = model.predict_classes(rowx, verbose=0)
        q = ctable.decode(rowx[0])
        correct = ctable.decode(rowy[0])
        guess = ctable.decode(preds[0], calc_argmax=False)
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)


--------------------------------------------------
Iteration 1
Q 885+094 T 0979 [91m☒[0m 0976
Q 008+533 T 0541 [91m☒[0m 0544
Q 055+492 T 0547 [91m☒[0m 0551
Q 320+048 T 0368 [91m☒[0m 0373
Q 019+662 T 0681 [91m☒[0m 0683
Q 064+787 T 0851 [91m☒[0m 0843
Q 806+061 T 0867 [91m☒[0m 0860
Q 603+729 T 1332 [91m☒[0m 1239
Q 370+494 T 0864 [91m☒[0m 0763
Q 940+099 T 1039 [91m☒[0m 1041

--------------------------------------------------
Iteration 2
Q 010+402 T 0412 [92m☑[0m 0412
Q 863+053 T 0916 [91m☒[0m 0917
Q 038+045 T 0083 [92m☑[0m 0083
Q 164+333 T 0497 [91m☒[0m 0597
Q 059+013 T 0072 [91m☒[0m 0082
Q 584+533 T 1117 [92m☑[0m 1117
Q 326+457 T 0783 [92m☑[0m 0783
Q 158+000 T 0158 [92m☑[0m 0158
Q 366+108 T 0474 [92m☑[0m 0474
Q 110+768 T 0878 [91m☒[0m 0778

--------------------------------------------------
Iteration 3
Q 091+449 T 0540 [92m☑[0m 0540
Q 237+036 T 0273 [92m☑[0m 0273
Q 006+383 T 0389 [92m☑[0m 0389
Q 640+010 T 0650 [92m☑[0m 0650
Q 001+217 T 0218

KeyboardInterrupt: ignored

In [0]:
model.save(f"task1{str(RNN)[-12:]}_reverse{REVERSE}.h5")

### Test

In [33]:
TEST_SIZE = 10 ** (DIGITS * 2)
print(TEST_SIZE)
x_test = np.zeros((TEST_SIZE, MAXLEN, len(chars)), dtype=np.bool)
y_test = np.zeros((TEST_SIZE, DIGITS + 1, len(chars)), dtype=np.bool)
print(y_test.shape)

j = -1
for a in range(0,10**(DIGITS)):
    for b in range(0,10**(DIGITS)):

        pad = '0'
        astr, bstr = str(a), str(b)
        astr = pad * (DIGITS - len(astr)) + astr
        bstr = pad * (DIGITS - len(bstr)) + bstr
        query = f"{astr}+{bstr}"
        ans = str(a + b)
        ans = pad * (DIGITS + 1 - len(ans)) + ans

        j += 1
        if j < 10: print(query, ans)

        if REVERSE:
            query = query[::-1]

        x_test[j] = ctable.encode(query, MAXLEN)
        y_test[j] = ctable.encode(ans, DIGITS + 1)

1000000
(1000000, 4, 11)
000+000 0000
000+001 0001
000+002 0002
000+003 0003
000+004 0004
000+005 0005
000+006 0006
000+007 0007
000+008 0008
000+009 0009


In [34]:
preds = model.predict_classes(x_test)
correct = [ctable.decode(y) for y in y_test]
guess = [ctable.decode(p, calc_argmax=False) for p in preds]
difference = [int(correct[i]) - int(guess[i]) for i in range(len(guess))]

acc = np.mean([correct[i] == guess[i] for i in range(len(correct))])
mse = np.mean(np.square(difference))
mae = np.mean(np.abs(difference))
print(acc, mse, mae)

0.921914 1138.646824 3.660692


## Task 2

In [35]:
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 10 # bit representation
REVERSE = False
MAXLEN = DIGITS + 1 + DIGITS
SPACE_PADDING = False

chars = '01+' # change
if SPACE_PADDING: chars += ' '
ctable = CharacterTable(chars)

questions = []
expected = []
seen = set()
# print('Generating data...')
while len(questions) < TRAINING_SIZE:
    f = lambda: int(''.join(np.random.choice(list('01')) # change
                    for i in range(np.random.randint(1, DIGITS + 1))))
    a, b = f(), f()
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)

    astr, bstr = str(a), str(b)
    if SPACE_PADDING:
        query = f"{astr}+{bstr}"
        query += ' ' * (DIGITS * 2 - len(query))

        ans = '{0:b}'.format(int(str(a), 2) + int(str(b), 2))
        ans += ' ' * (DIGITS + 1 - len(ans))
    else:
        pad = '0'
        astr = pad * (DIGITS - len(astr)) + astr
        bstr = pad * (DIGITS - len(bstr)) + bstr
        query = f"{astr}+{bstr}"

        ans = '{0:b}'.format(int(str(a), 2) + int(str(b), 2))
        ans = pad * (DIGITS + 1 - len(ans)) + ans

    if len(seen) < 5: 
        print("a b", astr, bstr)
        print("query", query, len(query))
        print("ans  ", ans, len(ans))
    
    if REVERSE:
        query = query[::-1]
    questions.append(query)
    expected.append(ans)

x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool) # n * x * d
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool) # n * y * d
for i, sentence in enumerate(questions):
    x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
    y[i] = ctable.encode(sentence, DIGITS + 1)

indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

a b 0000001010 0000000001
query 0000001010+0000000001 21
ans   00000001011 11
a b 0011101011 0000000110
query 0011101011+0000000110 21
ans   00011110001 11
a b 0000000011 0000000000
query 0000000011+0000000000 21
ans   00000000011 11
a b 0011000111 0011001111
query 0011000111+0011001111 21
ans   00110010110 11
Training Data:
(45000, 21, 3)
(45000, 11, 3)
Validation Data:
(5000, 21, 3)
(5000, 11, 3)


In [36]:
RNN = layers.SimpleRNN
HIDDEN_SIZE = 128
BATCH_SIZE = 64
LAYERS = 2

model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last output of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
    # By setting return_sequences to True, return not only the last output but
    # all the outputs so far in the form of (num_samples, timesteps,
    # output_dim). This is necessary as TimeDistributed in the below expects
    # the first dimension to be the timesteps.
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))

# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
model.summary()

Model: "sequential_20"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
simple_rnn_30 (SimpleRNN)    (None, 128)               16896     
_________________________________________________________________
repeat_vector_20 (RepeatVect (None, 11, 128)           0         
_________________________________________________________________
simple_rnn_31 (SimpleRNN)    (None, 11, 128)           32896     
_________________________________________________________________
simple_rnn_32 (SimpleRNN)    (None, 11, 128)           32896     
_________________________________________________________________
time_distributed_20 (TimeDis (None, 11, 3)             387       
Total params: 83,075
Trainable params: 83,075
Non-trainable params: 0
_________________________________________________________________


In [37]:
# Train the model each generation and show predictions against the validation
# dataset.
for iteration in range(1, 6):
    print()
    print('-' * 50)
    print('Iteration', iteration)
    model.fit(x_train, y_train,
              batch_size=BATCH_SIZE,
              epochs=1,
              validation_data=(x_val, y_val))
    # Select 10 samples from the validation set at random so we can visualize
    # errors.
    for i in range(10):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
        preds = model.predict_classes(rowx, verbose=0)
        q = ctable.decode(rowx[0])
        correct = ctable.decode(rowy[0])
        guess = ctable.decode(preds[0], calc_argmax=False)
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)


--------------------------------------------------
Iteration 1
Q 1010011000+1100110011 T 10111001011 [92m☑[0m 10111001011
Q 0000000011+0001010110 T 00001011001 [92m☑[0m 00001011001
Q 0000011110+0111001100 T 00111101010 [92m☑[0m 00111101010
Q 0011110000+1000010001 T 01100000001 [92m☑[0m 01100000001
Q 0111010001+0000000011 T 00111010100 [92m☑[0m 00111010100
Q 0000010000+0110001111 T 00110011111 [92m☑[0m 00110011111
Q 0000001001+1100101100 T 01100110101 [92m☑[0m 01100110101
Q 0111001110+0001011011 T 01000101001 [92m☑[0m 01000101001
Q 0000001010+0111110111 T 01000000001 [92m☑[0m 01000000001
Q 0000010001+0010110111 T 00011001000 [92m☑[0m 00011001000

--------------------------------------------------
Iteration 2
Q 0101100111+0001010100 T 00110111011 [92m☑[0m 00110111011
Q 0000100011+0100111101 T 00101100000 [92m☑[0m 00101100000
Q 0001011110+0110010011 T 00111110001 [92m☑[0m 00111110001
Q 0111101111+0010001001 T 01001111000 [92m☑[0m 01001111000
Q 0000001001+01010

In [0]:
model.save(f"task2{str(RNN)[-12:]}_reverse{REVERSE}.h5")

### Test

In [38]:
TEST_SIZE = 2 ** (DIGITS * 2)
x_test = np.zeros((TEST_SIZE, MAXLEN, len(chars)), dtype=np.bool)
y_test = np.zeros((TEST_SIZE, DIGITS + 1, len(chars)), dtype=np.bool)
print(y_test.shape)

j = -1
for a in range(0,2**(DIGITS)):
    for b in range(0,2**(DIGITS)):

        astr, bstr = "{0:b}".format(a), "{0:b}".format(b)
        if SPACE_PADDING:
            query = f"{astr}+{bstr}"
            query += ' ' * (DIGITS * 2 - len(query))

            ans = '{0:b}'.format(int(str(a)) + int(str(b)))
            ans += ' ' * (DIGITS + 1 - len(ans))
        else:
            pad = '0'
            astr = pad * (DIGITS - len(astr)) + astr
            bstr = pad * (DIGITS - len(bstr)) + bstr
            query = f"{astr}+{bstr}"

            ans = '{0:b}'.format(int(str(a)) + int(str(b)))
            ans = pad * (DIGITS + 1 - len(ans)) + ans

        # pad = '0'
        # astr, bstr = "{0:b}".format(a), "{0:b}".format(b)
        # astr = pad * (DIGITS - len(astr)) + astr
        # bstr = pad * (DIGITS - len(bstr)) + bstr
        # query = f"{astr}+{bstr}"
        # ans = '{0:b}'.format(int(str(a)) + int(str(b)))
        # ans = pad * (DIGITS + 1 - len(ans)) + ans

        if REVERSE:
            query = query[::-1]

        j += 1
        if j < 10: print(query, ans)
        x_test[j] = ctable.encode(query, MAXLEN)
        y_test[j] = ctable.encode(ans, DIGITS + 1)

(1048576, 11, 3)
0000000000+0000000000 00000000000
0000000000+0000000001 00000000001
0000000000+0000000010 00000000010
0000000000+0000000011 00000000011
0000000000+0000000100 00000000100
0000000000+0000000101 00000000101
0000000000+0000000110 00000000110
0000000000+0000000111 00000000111
0000000000+0000001000 00000001000
0000000000+0000001001 00000001001


In [39]:
preds = model.predict_classes(x_test)
correct = [ctable.decode(y) for y in y_test]
guess = [ctable.decode(p, calc_argmax=False) for p in preds]
guess = [g.replace(' ', '0') for g in guess]
guess = [g.replace('+', '0') for g in guess]
difference = [int(correct[i], 2) - int(guess[i], 2) for i in range(len(guess))]

err = np.mean([correct[i] != guess[i] for i in range(len(correct))])
mse = np.mean(np.square(difference))
mae = np.mean(np.abs(difference))
print(err, mse, mae)

0.0066585540771484375 1699.085021018982 2.410954475402832


## Sunday night

In [0]:
# Generate train and test sets above manually
print(SPACE_PADDING, REVERSE)  # just a check

indices_sub = np.arange(100000)
np.random.shuffle(indices)
x_test_sub = x_test[indices_sub]
y_test_sub = y_test[indices_sub]
correct = [ctable.decode(y) for y in y_test_sub]

# layers.SimpleRNN, layers.LSTM, layers.GRU
for RNN in [layers.SimpleRNN]:
    HIDDEN_SIZE = 128
    BATCH_SIZE = 64
    EPOCHS = 10
    LAYERS = 2

    model = Sequential()
    # "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
    # Note: In a situation where your input sequences have a variable length,
    # use input_shape=(None, num_feature).
    model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
    # As the decoder RNN's input, repeatedly provide with the last output of
    # RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
    # length of output, e.g., when DIGITS=3, max output is 999+999=1998.
    model.add(layers.RepeatVector(DIGITS + 1))
    # The decoder RNN could be multiple layers stacked or a single layer.
    for _ in range(LAYERS):
        # By setting return_sequences to True, return not only the last output but
        # all the outputs so far in the form of (num_samples, timesteps,
        # output_dim). This is necessary as TimeDistributed in the below expects
        # the first dimension to be the timesteps.
        model.add(RNN(HIDDEN_SIZE, return_sequences=True))

    # Apply a dense layer to the every temporal slice of an input. For each of step
    # of the output sequence, decide which character should be chosen.
    model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
    model.compile(loss='categorical_crossentropy',
                optimizer='adam',
                metrics=['accuracy'])
    print(model.summary())

    model.fit(x_train, y_train,
            batch_size=BATCH_SIZE,
            epochs=EPOCHS,
            validation_data=(x_val, y_val),
            use_multiprocessing = True)

    preds = model.predict_classes(x_test_sub)
    guess = [ctable.decode(p, calc_argmax=False) for p in preds]
    guess = [g.replace(' ', '0') for g in guess]
    guess = [g.replace('+', '0') for g in guess]
    difference = [int(correct[i], 2) - int(guess[i], 2) for i in range(len(guess))]

    err = np.mean([correct[i] != guess[i] for i in range(len(correct))])
    mse = np.mean(np.square(difference))
    mae = np.mean(np.abs(difference))
    print(err, mse, mae)

## Task 3

In [40]:
# Same as Task 1
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = False
#MAXLEN = DIGITS + 1 + DIGITS
chars = '0123456789'
ctable = CharacterTable(chars)

questions = []
expected = []
seen = set()
# print('Generating data...')
while len(questions) < TRAINING_SIZE:  # TRAINING_SIZE:
    def f(): return int(
        ''.join(np.random.choice(list('0123456789'))
                for i in range(np.random.randint(1, DIGITS + 1))))
    a, b = f(), f()
    
    # Skip any addition questions we've already seen
    # Also skip any such that x+Y == Y+x (hence the sorting).
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)
    
    pad = '0'
    astr, bstr = str(a), str(b)
    astr = pad * (DIGITS - len(astr)) + astr
    bstr = pad * (DIGITS - len(bstr)) + bstr
    query = ""
    for i in range(DIGITS):
        query += astr[i] + bstr[i]
    ans = str(a + b)
    ans = pad * (DIGITS + 1 - len(ans)) + ans

    if len(seen) < 5: print(query, ans)

    if REVERSE:
        # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
        # space used for padding.)
        query = query[::-1]
    questions.append(query)
    expected.append(ans)

384933 1236
054517 0598
050646 0570
684379 1486


In [0]:
# Vectorize
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool) # n * x * d
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool) # n * y * d
for i, sentence in enumerate(questions):
    x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
    y[i] = ctable.encode(sentence, DIGITS + 1)

# Shuffle (x, y)
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

In [0]:
# Try replacing GRU, or SimpleRNN.
RNN = layers.SimpleRNN
HIDDEN_SIZE = 128
BATCH_SIZE = 64
LAYERS = 2

print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last output of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
    # By setting return_sequences to True, return not only the last output but
    # all the outputs so far in the form of (num_samples, timesteps,
    # output_dim). This is necessary as TimeDistributed in the below expects
    # the first dimension to be the timesteps.
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))

# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
model.summary()

In [0]:
# Train the model each generation and show predictions against the validation
# dataset.
for iteration in range(1, 200):
    print()
    print('-' * 50)
    print('Iteration', iteration)
    model.fit(x_train, y_train,
              batch_size=BATCH_SIZE,
              epochs=1,
              validation_data=(x_val, y_val))
    # Select 10 samples from the validation set at random so we can visualize
    # errors.
    for i in range(10):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
        preds = model.predict_classes(rowx, verbose=0)
        q = ctable.decode(rowx[0])
        correct = ctable.decode(rowy[0])
        guess = ctable.decode(preds[0], calc_argmax=False)
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)

In [0]:
model.save(f"task3{str(RNN)[-12:]}_reverse{REVERSE}.h5")

### Test

In [0]:
TEST_SIZE = 10 ** (DIGITS * 2)
print(TEST_SIZE)
x_test = np.zeros((TEST_SIZE, MAXLEN, len(chars)), dtype=np.bool)
y_test = np.zeros((TEST_SIZE, DIGITS + 1, len(chars)), dtype=np.bool)
print(y_test.shape)

j = -1
for a in range(0,10**(DIGITS)):
    for b in range(0,10**(DIGITS)):

        pad = '0'
        astr, bstr = str(a), str(b)
        astr = pad * (DIGITS - len(astr)) + astr
        bstr = pad * (DIGITS - len(bstr)) + bstr
        query = ""
        for i in range(DIGITS):
            query += astr[i] + bstr[i]
        ans = str(a + b)
        ans = pad * (DIGITS + 1 - len(ans)) + ans

        if REVERSE:
            query = query[::-1]

        j += 1
        if j < 10: print(query, ans)
        x_test[j] = ctable.encode(query, MAXLEN)
        y_test[j] = ctable.encode(ans, DIGITS + 1)

In [0]:
preds = model.predict_classes(x_test)
correct = [ctable.decode(y) for y in y_test]
guess = [ctable.decode(p, calc_argmax=False) for p in preds]
difference = [int(correct[i]) - int(guess[i]) for i in range(len(guess))]

acc = np.mean([correct[i] == guess[i] for i in range(len(correct))])
mse = np.mean(np.square(difference))
mae = np.mean(np.abs(difference))
print(acc, mse, mae)

## Task 4


In [41]:
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 10 # bit representation
REVERSE = False
MAXLEN = DIGITS * 2
SPACE_PADDING = True

questions = []
expected = []
seen = set()
while len(questions) < TRAINING_SIZE: 
    f = lambda: int(''.join(np.random.choice(list('01')) # change
                    for i in range(np.random.randint(1, DIGITS + 1))))
    a, b = f(), f()
    key = tuple(sorted((a, b)))
    if key in seen:
        continue
    seen.add(key)

    astr, bstr = str(a), str(b)
    if SPACE_PADDING:
        maxlen = max(len(astr), len(bstr))
        astr = '0' * (maxlen - len(astr)) + astr
        bstr = '0' * (maxlen - len(bstr)) + bstr
        query = ""
        for i in range(maxlen):
            query = query + astr[i] + bstr[i]
        query += ' ' * (DIGITS * 2 - len(query))

        ans = '{0:b}'.format(int(str(a), 2) + int(str(b), 2))
        ans += ' ' * (DIGITS + 1 - len(ans))
    else:
        pad = '0'
        astr = pad * (DIGITS - len(astr)) + astr
        bstr = pad * (DIGITS - len(bstr)) + bstr
        query = ""
        for i in range(DIGITS):
            query = query + astr[i] + bstr[i]

        ans = '{0:b}'.format(int(str(a), 2) + int(str(b), 2))
        ans = pad * (DIGITS + 1 - len(ans)) + ans

    if len(seen) < 5: 
        print("a b", astr, bstr)
        print("query", query, len(query))
        print("ans", ans, len(ans))

    if REVERSE:
        query = query[::-1]
    questions.append(query)
    expected.append(ans)

print(questions[0])
print(expected[0])

a b 00000000 10110010
query 0100010100000100     20
ans 10110010    11
a b 1110110 0100111
query 10111000111101       20
ans 10011101    11
a b 000010110 101101011
query 010001011001101101   20
ans 110000001   11
a b 0010100 1111101
query 01011101110001       20
ans 10010001    11
0100010100000100    
10110010   


In [0]:
chars = '01' # change
if SPACE_PADDING:
    chars += ' '
ctable = CharacterTable(chars)

x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool) # n * x * d
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool) # n * y * d
for i, sentence in enumerate(questions):
    x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
    y[i] = ctable.encode(sentence, DIGITS + 1)

indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

In [0]:
RNN = layers.SimpleRNN
HIDDEN_SIZE = 128
BATCH_SIZE = 64
LAYERS = 2

model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
# As the decoder RNN's input, repeatedly provide with the last output of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
    # By setting return_sequences to True, return not only the last output but
    # all the outputs so far in the form of (num_samples, timesteps,
    # output_dim). This is necessary as TimeDistributed in the below expects
    # the first dimension to be the timesteps.
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))

# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))
model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
model.summary()

In [0]:
for iteration in range(1, 40 + 1):
    print()
    print('-' * 50)
    print('Iteration', iteration)
    model.fit(x_train, y_train,
              batch_size=BATCH_SIZE,
              epochs=1,
              validation_data=(x_val, y_val))
    # Select 10 samples from the validation set at random so we can visualize
    # errors.
    for i in range(10):
        ind = np.random.randint(0, len(x_val))
        rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
        preds = model.predict_classes(rowx, verbose=0)
        q = ctable.decode(rowx[0])
        correct = ctable.decode(rowy[0])
        guess = ctable.decode(preds[0], calc_argmax=False)
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)

In [0]:
model.save(f"task4{str(RNN)[-12:]}_reverse{REVERSE}.h5")

### Test

In [0]:
TEST_SIZE = 2 ** (DIGITS * 2)
x_test = np.zeros((TEST_SIZE, MAXLEN, len(chars)), dtype=np.bool)
y_test = np.zeros((TEST_SIZE, DIGITS + 1, len(chars)), dtype=np.bool)
print(y_test.shape)

j = -1
for a in range(0,2**(DIGITS)):
    for b in range(0,2**(DIGITS)):

        pad = '0'
        astr, bstr = "{0:b}".format(a), "{0:b}".format(b)
        if SPACE_PADDING:
            maxlen = max(len(astr), len(bstr))
            astr = '0' * (maxlen - len(astr)) + astr
            bstr = '0' * (maxlen - len(bstr)) + bstr
            query = ""
            for i in range(maxlen):
                query = query + astr[i] + bstr[i]
            query += ' ' * (DIGITS * 2 - len(query))

            ans = '{0:b}'.format(int(str(a)) + int(str(b)))
            ans += ' ' * (DIGITS + 1 - len(ans))

            if len(seen) < 5: 
                print("a b", astr, bstr)
                print("query", query, len(query))
                print("ans", ans, len(ans))
        else:
            pad = '0'
            astr = pad * (DIGITS - len(astr)) + astr
            bstr = pad * (DIGITS - len(bstr)) + bstr
            query = ""
            for i in range(DIGITS):
                query = query + astr[i] + bstr[i]

            ans = '{0:b}'.format(int(str(a)) + int(str(b)))
            ans = pad * (DIGITS + 1 - len(ans)) + ans

        if j < 10: print(query, ans)

        if REVERSE:
            query = query[::-1]

        j += 1
        x_test[j] = ctable.encode(query, MAXLEN)
        y_test[j] = ctable.encode(ans, DIGITS + 1)

print(j, TEST_SIZE)

In [0]:
preds = model.predict_classes(x_test)
correct = [ctable.decode(y) for y in y_test]
guess = [ctable.decode(p, calc_argmax=False) for p in preds]
guess = [g.replace(' ', '0') for g in guess]
difference = [int(correct[i], 2) - int(guess[i], 2) for i in range(len(guess))]

err = np.mean([correct[i] != guess[i] for i in range(len(correct))])
mse = np.mean(np.square(difference))
mae = np.mean(np.abs(difference))
print(err, mse, mae)