In [1]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.data import Dataset
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Dense, SimpleRNN, GRU, LSTM, TimeDistributed
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.optimizers import Adam, Nadam

import numpy as np
from sklearn.model_selection import train_test_split

""" Avoid error with Blas:GEMM not initializing when using GPU:
See: https://stackoverflow.com/questions/43990046/tensorflow-blas-gemm-launch-failed
"""
physical_devices = tf.config.list_physical_devices('GPU') 
tf.config.experimental.set_memory_growth(physical_devices[0], True)

tf.random.set_seed(98)
np.random.seed(99)

## Exercise 9

Embedded Reber grammars were used by Hochreiter and Schmidhuber in their paper about LSTMs. They are artificial grammars that produce strings such as “BPBTSXXVPSEPE.” Check out [Jenny Orr’s nice introduction to this topic](https://www.willamette.edu/~gorr/classes/cs449/reber.html). Choose a particular embedded Reber grammar (such as the one represented on Jenny Orr’s page), then train an RNN to identify whether a string respects that grammar or not. You will first need to write a function capable of generating a training batch containing about 50% strings that respect the grammar, and 50% that don’t.

We start creating a Reber grammar generator. I'll follow the same structure provided in the link above will allow for tokens to be passed to the class to generate the strings.

Each state will be labelled as follows and has two possible transitions

![Reber Grammar](images\reber.png)

In [2]:
reber_transitions = {
    0: [(1, 'B')],
    1: [(2, 'T'), (3, 'P')],
    2: [(2, 'S'), (4, 'X')],
    3: [(3, 'T'), (5, 'V')],
    4: [(6, 'S'), (3, 'X')],
    5: [(4, 'P'), (6, 'V')],
    6: [(7, 'E')]}

def move_state(cur_state, transitions=reber_transitions):
    """Finds possible next moves in transition table and picks one at a random"""
    paths = transitions[cur_state]
    return paths[np.random.choice(len(transitions[cur_state]), size=1)[0]]
    
def generate_string(string=''):
    """Iterates over possible paths until final state is reached"""
    state = 0
    while state != 7:
        state, char = move_state(state)
        string += char
    return string

def find_next_state(cur_state, char, transitions=reber_transitions):
    """Given a current state and a character in the next state, searches transitions for a state
    with the corresponding next_char, if it exists."""
    for (next_state, next_char) in transitions[cur_state]:
        if next_char == char:
            return next_state, next_char
    return -1, -1

def validate_string(string, transitions=reber_transitions, verbose=False):
    """Iterates through a given string and checks whether the string was generated by some grammar with
    given transitions. 
    
    Probably can be improved
    """
    next_state, next_char = move_state(0, transitions)
    for idx, char in enumerate(string):
        if verbose: print(f"Next State: {next_state}; Testing {idx} : {char} vs {next_char}")
        if char != next_char:
            return 0
        try:
            if verbose: print(f"\tGoing to find next state by accessing {string[idx+1]}")
            next_state, next_char = find_next_state(next_state, string[idx+1])
            if verbose: print(f"\tReturned ({next_state}, {next_char})")
            if next_state == -1:
                return 0
        except IndexError:
            pass # Trying to access out of bounds value, meaning we reached the end of the checks
        if next_state == 7:
            break
    return 1
        

Now let's generate positive classes for our dataset and check they are valid

In [3]:
dataset_size = 10_000
reber_strings = [generate_string() for _ in range(dataset_size)]
reber_labels = [1 for _ in range(dataset_size)]

In [4]:
assert all([validate_string(reber_string) for reber_string in reber_strings]), "Some generated string is NOT REBER!"

That some sample non-reber strings are invalid

In [5]:
not_reber_tests = ['BTSSPXSE', 'BTXXVVSE', 'BPVSPSE', 'BTSSSE', 'BPTVVB']
assert not any([validate_string(not_reber) for not_reber in not_reber_tests]), "One of the test strings was identified as reber"

And let's generate a bunch of random strings and use the function above to mark them as not reber strings.

In [6]:
vocab = 'BEPSTVX'
min_length = len(min(reber_strings, key=len))
max_length = len(max(reber_strings, key=len))

# Generate N random strings with the vocab. Each time strings will have different lengths 
# that are bounded by the min/max size of the reber_lengths
randomly_generated = [''.join(np.random.choice(list(vocab), size=np.random.randint(min_length, max_length)))
                      for _ in range(dataset_size)]
randomly_gen_labels = [validate_string(random_str) for random_str in randomly_generated]

Now we're ready to create the training set

In [7]:
X = np.concatenate((reber_strings, randomly_generated))
y = np.concatenate((reber_labels, randomly_gen_labels))
print(X.shape, y.shape)

(20000,) (20000,)


In [8]:
np.unique(y, return_counts=True)

(array([0, 1]), array([ 9998, 10002], dtype=int64))

Data looks balanced enough! Now let's convert it to a Tensorflow Dataset and do preprocessing

First we start with tokenizing at character level, converting the characters into numbers and creating a dataset. 

In [13]:
def create_datasets(X, y, batch_size=32):
    tokenizer = Tokenizer(char_level=True, lower=False)
    tokenizer.fit_on_texts(X)
    
    encoded = tokenizer.texts_to_sequences(X)
    padded = pad_sequences(encoded, maxlen=max_length, padding='post', value=0) # pad with zeros
    
    # Recall RNN inputs have shape [batch_size, time_steps, dimensionality]
    # Need to reshape the data to an appropriate format
    X_full, y_full = padded[..., np.newaxis], y.reshape(-1,1)
    X_train_full, X_test, y_train_full, y_test = train_test_split(X_full, y_full, test_size=0.05)
    X_train, X_valid, y_train, y_valid = train_test_split(X_train_full, y_train_full, test_size=0.05)
        
    train_set = Dataset.from_tensor_slices((X_train, y_train)).shuffle(dataset_size).batch(batch_size).prefetch(1)
    valid_set = Dataset.from_tensor_slices((X_valid, y_valid)).shuffle(dataset_size).batch(batch_size)
    test_set = Dataset.from_tensor_slices((X_test, y_test)).shuffle(dataset_size)
    return train_set, valid_set, test_set

In [14]:
train_set, valid_set, test_set = create_datasets(X, y)

In [15]:
model = Sequential([
    Input(shape=[None, 1], name='Input'),
    SimpleRNN(32, return_sequences=True),
    SimpleRNN(32),
    Dense(1, activation="sigmoid")])

model.compile(loss="binary_crossentropy", optimizer=Adam(), metrics=['accuracy'])

In [16]:
with tf.device('GPU:0'):
    model.fit(train_set, epochs=5, validation_data=valid_set)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
