<a href="https://colab.research.google.com/github/Amanuel94/Notebooks/blob/main/Embedded_Rebber_Grammars_with_RNNS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install tensorflow --upgrade
! pip install keras --upgrade

import random as rnd
import numpy as np
import tensorflow as tf
from collections import Counter


[0m

In [None]:
# Constants
SEED = 42
MAX_LEN = 20

In [None]:
reber_grammar = {
    "sos": [("B", "a")],
    "a":[("T", "b"), ("P", "c")],
    "b": [("S", "b"), ("X", "d")],
    "c": [("T", "c"), ("V", "e")],
    "d": [("X", "c"), ("S", "f")],
    "e": [("P", "d"), ("V",  "f")],
    "f": [("E", "eos")]
}

embedded_grammar = {
    "sos":[("B", "r")],
    "r": [("T", "sost"), ("P", "sosp")],
    "sost": [("B", "at")],
    "at":[("T", "bt"), ("P", "ct")],
    "bt": [("S", "bt"), ("X", "dt")],
    "ct": [("T", "ct"), ("V", "et")],
    "dt": [("X", "ct"), ("S", "ft")],
    "et": [("P", "dt"), ("V",  "ft")],
    "ft": [("E", "eost")],
    "sosp": [("B", "ap")],
    "ap":[("T", "bp"), ("P", "cp")],
    "bp": [("S", "bp"), ("X", "dp")],
    "cp": [("T", "ct"), ("V", "ep")],
    "dp": [("X", "cp"), ("S", "fp")],
    "ep": [("P", "dp"), ("V",  "fp")],
    "fp": [("E", "eosp")],
    "eost": [("T", "s")],
    "eosp": [("P", "s")],
    "s": [("E", "eos")]
}

# embedded_rebber_grammar = {
#     "esos": [("B", "x")],
#     "x": [("T", "sos"), ""]
# }

nodes = ["sos", "a", "b", "c", "d", "e", "f", "eos"]
vocab =  ["B", "T", "P", "S", "X", "V", "E"]

In [None]:
rnd.seed(SEED)
np.random.seed(SEED)

In [None]:
def generate_string(grammar, member = True, prob = 0.3, by_alphabet = True, decay = 2) :
  node = "sos"
  flag = False
  seq = []
  while node != "eos":
    ch, nxt = rnd.choice(grammar[node])
    pr = np.random.uniform(0, 1, 1) < prob

    # pick a wrong node with probability prob
    if pr and not member:
      if by_alphabet:
        ch = rnd.choice([v for v in vocab if v != ch])
      else:
        nxt = rnd.choice([v for v in grammar.keys() if v != nxt])
      flag = True
      prob /= decay

    seq.append(ch)
    node = nxt

  return seq, flag

In [None]:
# count = 0
# count_prev = 0
# for i in range(10000):
#   s, f = generate_string(embedded_grammar, False, 0.3, by_alphabet = False, decay = 10)
#   if not f:
#     count_prev += 1
#     s, f = generate_string(embedded_grammar, False, 0.3, by_alphabet = False, decay = 10)
#     if not f: count += 1
#   # print("".join(s))
# print(count, count_prev)

In [None]:
words = tf.constant(["<pad>"] + vocab)
word_ids = tf.range(words.shape[0],  dtype = tf.int64)
vocab_init = tf.lookup.KeyValueTensorInitializer(words, word_ids)
table = tf.lookup.StaticVocabularyTable(vocab_init, 10)



In [None]:


def prepare(n_instances, n_batch, grammar, decay_dist, max_len = 50):
  labels = [0]*(n_instances//2) + [1]*(n_instances//2)
  rnd.shuffle(labels)
  strings = []
  for label in labels:

    if label:
      s, f = generate_string(grammar)

    else:
      decay = rnd.choice(decay_dist)
      by_alphabet = np.random.uniform(0, 1) > 0.1
      s, f = generate_string(grammar, member = False, decay = decay, by_alphabet = by_alphabet)
      if not f:
        s, f = generate_string(grammar, member = False, decay = decay, by_alphabet = by_alphabet)

    # strings.append(s)
    # if len(s) < max_len:
    #     s = s + ["<pad>"]*(max_len - len(s))
    strings.append(s)

  # strings = list(map(lambda x: index(x), strings))

  strings = tf.ragged.constant(strings, ragged_rank = 1)
  strings_ds = tf.data.Dataset.from_tensor_slices(strings)

  strings_ds = strings_ds.map(lambda x: table.lookup(x))
  # strings_ds = strings_ds.bucket_by_sequence_length(
  #     element_length_func=lambda elem: tf.shape(elem)[0],
  #     bucket_boundaries = [5, 10, 17, 25],
  #     bucket_batch_sizes = [n_instances//5]*4 + [n_instances - 4*(n_instances//5)],
  #     padding_values = tf.constant(0, dtype = tf.int64)
  # )
  strings_ds = strings_ds.map(lambda y: tf.one_hot(y, depth = words.shape[0]))

  # strings_ds = strings_ds.map(lambda x: x[0])

  labels_ds = tf.data.Dataset.from_tensor_slices(labels)
  ds = tf.data.Dataset.zip(strings_ds, labels_ds)
  # ds = ds.map(lambda x, y: (tf.one_hot(x, depth = len(indices)), y))
  return ds.padded_batch(n_batch)





In [None]:
decay_dist = [2]*1 + [5]*1 + [10]*7

In [None]:
ds = prepare(10, 3, embedded_grammar, decay_dist, max_len = MAX_LEN,)
for a, b in ds:
  print(a.shape)

(3, 18, 8)
(3, 18, 8)
(3, 19, 8)
(1, 14, 8)


In [None]:
train_ds = prepare(50_000, 32, embedded_grammar, decay_dist, max_len = MAX_LEN)
val_ds = prepare(15_000, 32, embedded_grammar, decay_dist, MAX_LEN)
test_ds = prepare(10_000, 32, embedded_grammar, decay_dist, MAX_LEN)

In [None]:
# s, f = generate_string(embedded_grammar)
# print(s)
# print(table.lookup(tf.constant(s + ["<pad>"])))
# print(tf.one_hot(table.lookup(tf.constant(s + ["<pad>"])), depth = 10))

In [62]:
model = tf.keras.models.Sequential([
    tf.keras.layers.InputLayer(shape=[None, words.shape[0]], dtype=tf.int64, ragged=True),
    tf.keras.layers.GRU(100, return_sequences=True),
    tf.keras.layers.GRU(20),
    tf.keras.layers.Dense(30, activation = "relu"),
    tf.keras.layers.Dense(1, activation = "sigmoid")
])

model.summary()

In [67]:
model.compile(
    loss = "binary_crossentropy",
    optimizer = "adam",
    metrics = ["accuracy"]
)

model.fit(train_ds, validation_data = val_ds, epochs = 20)

Epoch 1/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 45ms/step - accuracy: 0.7068 - loss: 0.5235 - val_accuracy: 0.8333 - val_loss: 0.3930
Epoch 2/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m69s[0m 44ms/step - accuracy: 0.8766 - loss: 0.3143 - val_accuracy: 0.8871 - val_loss: 0.2926
Epoch 3/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m70s[0m 45ms/step - accuracy: 0.9182 - loss: 0.2324 - val_accuracy: 0.9541 - val_loss: 0.1481
Epoch 4/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 47ms/step - accuracy: 0.9540 - loss: 0.1474 - val_accuracy: 0.9791 - val_loss: 0.0780
Epoch 5/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 47ms/step - accuracy: 0.9758 - loss: 0.0882 - val_accuracy: 0.9831 - val_loss: 0.0611
Epoch 6/20
[1m1563/1563[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 49ms/step - accuracy: 0.9849 - loss: 0.0595 - val_accuracy: 0.9915 - val_loss: 0.0383
Epoc

<keras.src.callbacks.history.History at 0x78685e4162f0>

In [75]:
test_strings = ["BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE",
                "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"]
X_test = [
    tf.one_hot(table.lookup(tf.constant(list(s))), depth = words.shape[0]) for s in test_strings
]
X_test = tf.data.Dataset.from_tensor_slices(X_test).batch(2)

# .map(lambda x: tf.reshape(x, shape = [None, 31, 8]))

# X_test

y_proba = model.predict(X_test)
print()
print("Estimated probability that these are Reber strings:")
for index, string in enumerate(test_strings):
    print("{}: {:.2f}%".format(string, 100 * y_proba[index][0]))

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step

Estimated probability that these are Reber strings:
BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE: 99.44%
BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE: 0.89%
