In [1]:
import numpy as np
import pandas as pd
import csv
import subprocess
from pandas import DataFrame
from matplotlib import pyplot
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

In [2]:
#Transformer vars
vocab_size = 2556  # Only consider the top 20k words
maxlen = 20  # Max sequence size
jump = 1 # Jump between inputs
embed_dim = 256  # Embedding size for each token
num_att_layers = 1 # Number of attention layers
num_heads = 12  # Number of attention heads
feed_forward_dim = 256  # Hidden layer size in feed forward network inside transformer
batch_size = 256
top_k = 10
dropout = 0.1
training_epochs = 100

In [3]:
def to_row(trace):
    strings = []
    string = ""
    space = True
    jump = False
    for elem in trace:
      if elem == "<":
        jump = True
      if jump == False:
        if elem == " ":
          space = True
          strings.append(string)
          string = ""
        else:
          if space:
            string = elem
          else:
            string += "/" + elem
          space = False
      elif elem == " ":
        jump = False
    return strings

In [4]:
def print_tensor_traces(trace_tensor, output_file, headers):
    with open(output_file, 'w') as f:
      w = csv.writer(f, delimiter=';')
      w.writerow(headers)
      for trace in trace_tensor:
        strings = to_row(trace)
        w.writerow(strings)

In [5]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

In [6]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.mask_att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.att1 = layers.MultiHeadAttention(num_heads, embed_dim)
        self.att2 = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn1 = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.ffn2 = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm4 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm5 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        self.dropout3 = layers.Dropout(rate)
        self.dropout4 = layers.Dropout(rate)
        self.dropout5 = layers.Dropout(rate)
        
    def call(self, inputs, outputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        # Left block
        attention1_output = self.att1(inputs, inputs, attention_mask=causal_mask)
        attention1_output = self.dropout1(attention1_output)
        out1 = self.layernorm1(inputs + attention1_output)
        ffn1_output = self.ffn1(out1)
        ffn1_output = self.dropout2(ffn1_output)
        out2 = self.layernorm2(out1 + ffn1_output)

        # Right block
        attention_mask_output = self.mask_att(outputs, outputs, attention_mask=causal_mask)
        attention_mask_output = self.dropout3(attention_mask_output)
        out3 = self.layernorm3(outputs + attention_mask_output)
        #causal_mask2 = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention2_output = self.att2(out3, out2, attention_mask=causal_mask)
        attention2_output = self.dropout4(attention2_output)
        out4 = self.layernorm4(out3 + attention2_output)
        ffn2_output = self.ffn2(out4)
        ffn2_output = self.dropout5(ffn2_output)
        return self.layernorm5(out4 + ffn2_output)

In [7]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [8]:
def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer1 = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    embedding_layer2 = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer1(inputs)
    y = embedding_layer2(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim, rate=dropout)
    z = transformer_block(x, y)
    outputs = layers.Dense(vocab_size)(z)
    model = keras.Model(inputs=inputs, outputs=[outputs, z])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    return model

In [9]:
def custom_standardization(input_string):
    return tf.strings.regex_replace(input_string, f"([/])", r"")

In [10]:
def add_pads(traces):
  addition = []
  for r in traces:
    if r.split(" ")[0] == "<init>":
      string = ""
      leng = 0
      for elem in r.split(" "):
        if elem == "<init>":
          string += elem
        else:
          string += " " + elem
        leng += 1
        pads = "".join([" <PAD>"]*(maxlen - leng))
        new = string + pads
        addition.append(new)
  return np.append(traces, addition)

In [11]:
def crop_pads(traces, size):
  new_traces = []
  for r in traces:
    string = []
    leng = 0
    for elem in r.split(" "):
      if elem == "<init>":
        string.append(elem)
      else:
        string.append(" " + elem)
      leng += 1
      if leng >= size and (leng%jump == 0 or elem == "<end>"):
        new = string[-size:]
        new = "".join(new)
        new_traces.append(new)
  return new_traces

In [12]:
file_num = 1
source_file = "fsm"+str(file_num)+" Dataset"
data = pd.read_csv("./drive/MyDrive/Colab Notebooks/data/" + source_file + ".csv", sep=";", keep_default_na=False)

y = data.iloc[:,-1:]
data = data.drop(data.columns[-1],axis=1)
headers = data.columns
traces = data.copy()
traces.insert(0, "init", ["<init>"]*traces.shape[0], True)
traces['end'] = ["<end>"]*traces.shape[0]
print(traces.head())
traces = traces.agg(" ".join, axis=1)
traces = traces.astype('string')
traces = traces.to_numpy()
traces = crop_pads(traces,maxlen)
#traces = add_pads(traces)
print(traces[0:5])
print(traces[-5:])
print(len(traces))

text_ds = tf.data.Dataset.from_tensor_slices(traces)
text_ds = text_ds.shuffle(buffer_size=len(traces))
text_ds = text_ds.batch(batch_size)

# Create a vectorization layer and adapt it to the text
vectorize_layer = TextVectorization(standardize=custom_standardization, max_tokens=vocab_size - 1, output_mode="int", output_sequence_length=maxlen + 1)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices
vocab.append("<PAD>")

     init    1    2    3    4    5    6  ...   95   96   97   98   99  100    end
0  <init>  U/M  a/T  K/B  S/U  k/C  M/\  ...  p/D  o/r  ^/j  U/F  A/p  [/G  <end>
1  <init>  R/f  M/]  n/_  a/L  h/c  k/n  ...  J/L  a/\  ^/[  X/_  a/_  h/K  <end>
2  <init>  f/\  Y/J  p/R  U/B  `/H  f/r  ...  n/L  Z/E  ]/Q  L/R  Q/]  n/Z  <end>
3  <init>  i/[  F/e  c/Q  B/A  e/j  a/k  ...  V/]  p/S  W/Z  O/F  V/T  a/^  <end>
4  <init>  `/b  c/n  J/M  g/B  X/p  Z/a  ...  I/E  m/T  o/c  W/q  n/O  A/p  <end>

[5 rows x 102 columns]
['<init> U/M a/T K/B S/U k/C M/\\ P/q q/C R/Z U/M c/Y a/[ f/_ [/H Q/k ]/o o/H n/R `/h', ' U/M a/T K/B S/U k/C M/\\ P/q q/C R/Z U/M c/Y a/[ f/_ [/H Q/k ]/o o/H n/R `/h K/P', ' a/T K/B S/U k/C M/\\ P/q q/C R/Z U/M c/Y a/[ f/_ [/H Q/k ]/o o/H n/R `/h K/P o/B', ' K/B S/U k/C M/\\ P/q q/C R/Z U/M c/Y a/[ f/_ [/H Q/k ]/o o/H n/R `/h K/P o/B C/O', ' S/U k/C M/\\ P/q q/C R/Z U/M c/Y a/[ f/_ [/H Q/k ]/o o/H n/R `/h K/P o/B C/O A/q']
[' F/V F/Q C/L O/D A/F P/\\ m/X Z/r K/T J/R ]/k Q/Y X/O 

In [13]:
def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

In [14]:
text_ds = text_ds.map(prepare_lm_inputs_labels)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

In [15]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]
    
    def on_train_begin(self,logs={}):
        self.losses = []
        self.accuracies = []

    def on_epoch_end(self, epoch, logs={}):
        self.losses.append(logs.get('loss'))
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        last_token = 1
        while num_tokens_generated <= self.max_tokens and last_token != vocab.index("<end>"):
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len <= 0:
                x = start_tokens[-(maxlen-1):] + [vocab.index("<PAD>")]
                sample_index = maxlen - 2
            elif pad_len > 0:
                x = start_tokens + [vocab.index("<PAD>")] * pad_len
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
            last_token = sample_token
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")
        plt.plot(self.losses)
        plt.title('Loss')
        plt.show()

In [16]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "<init>"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 100
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab, top_k=top_k)

In [None]:
model = create_model()
model.fit(text_ds, verbose=2, epochs=training_epochs, callbacks=[text_gen_callback])
model.save("trasformer")

Epoch 1/10


In [18]:
class TextGen():
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(self, model, max_tokens, start_tokens, index_to_word, top_k=10):
        self.model = model
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def gen_traces(self, num_traces, logs=None):
        traces = []
        for epoch in range(num_traces):
            start_tokens = [_ for _ in self.start_tokens]
            num_tokens_generated = 0
            tokens_generated = []
            last_token = 1
            while num_tokens_generated <= self.max_tokens and last_token != vocab.index("<end>"):
                pad_len = maxlen - len(start_tokens)
                sample_index = len(start_tokens) - 1
                if pad_len <= 0:
                    x = start_tokens[-(maxlen-1):] + [vocab.index("<PAD>")]
                    sample_index = maxlen - 2
                elif pad_len > 0:
                    x = start_tokens + [vocab.index("<PAD>")] * pad_len
                x = np.array([x])
                y, _ = self.model.predict(x)
                sample_token = self.sample_from(y[0][sample_index])
                tokens_generated.append(sample_token)
                start_tokens.append(sample_token)
                num_tokens_generated = len(tokens_generated)
                last_token = sample_token
            txt = " ".join([self.detokenize(_) for _ in self.start_tokens + tokens_generated])
            #print(f"generated text:\n{txt}\n")
            print(f"trace {epoch}")
            traces.append(txt)
        return traces

In [None]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "<init>"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
length = 100
num_traces = 100
model = keras.models.load_model("trasformer")
text_generator = TextGen(model, length, start_tokens, vocab, top_k=top_k)
generated_traces = text_generator.gen_traces(num_traces)

In [None]:
print_tensor_traces(generated_traces, "./drive/MyDrive/Colab Notebooks/results/traces_" + source_file + ".csv" , headers)

In [21]:
#Update and upgrade the system before installing anything else.
!apt-get update > /dev/null
!apt-get upgrade > /dev/null

#Install the Java JDK.
!apt-get install default-jdk > /dev/null

#Check the Java version to see if everything is working well.
!javac -version

javac 11.0.11


In [22]:
#!java -jar "./drive/MyDrive/Colab Notebooks/CheckTraces.jar"

In [None]:
valid = subprocess.run(['java', '-jar', './drive/MyDrive/Colab Notebooks/CheckTraces.jar', str(file_num)], stdout=subprocess.PIPE).stdout.decode('utf-8')
valid = float(valid.split(' ')[3][:-1])

In [None]:
count = 0
total = 0
for t in generated_traces:
  t = to_row(t)
  for tt in np.array(data):
    if t[0] == tt[0]:
      same = True
      for elem, elem2 in zip(t, tt):
          if elem != elem2:
            same = False
      if same:
        count += 1
  total += 1
subsets = count/total
count = 0
total = 0
for t in generated_traces:
  t = to_row(t)
  for tt in np.array(data):
    if t[0] == tt[0] and len(t) == len(tt):
      same = True
      for elem, elem2 in zip(t, tt):
          if elem != elem2:
            same = False
      if same:
        count += 1
  total += 1
intersect = count/total
count = 0
total = 0
reps = []
for i in range(len(generated_traces)):
  t = to_row(generated_traces[i])
  same = False
  for j in range(len(generated_traces) - i - 1):
    tt = to_row(generated_traces[j+i+1])
    if t[0] == tt[0] and len(t) == len(tt) and not same:
      same = True
      for elem, elem2 in zip(t, tt):
          if elem != elem2:
            same = False
      if same:
        count += 1
        reps.append(t)
  total += 1
repeated = count/total
count = 0
total = 0
for t in reps:
  same = False
  for tt in np.array(data):
    if t[0] == tt[0] and len(t) == len(tt) and not same:
      same = True
      for elem, elem2 in zip(t, tt):
          if elem != elem2:
            same = False
      if same:
        count += 1
  total += 1
if total > 0:
  old_repeated = count/total
else:
  old_repeated = 0
print(f"Valid traces: {valid:0.3f}%")
print()
print(f"Traces seen from dataset: {subsets:0.3f}%")
print(f"Traces copied from dataset: {intersect:0.3f}%")
subs = subsets-intersect
print(f"Traces subset of traces from dataset: {subs:0.3f}%")
print()
new = 1 - subsets
print(f"New unseen traces: {new:0.3f}%")
unseen = valid - subsets
print(f"New unseen and valid traces: {unseen:0.3f}%")
nonvalid = 1 - valid
print(f"New unseen and non-valid traces: {nonvalid:0.3f}%")
print()
print(f"Traces repeated: {repeated:0.3f}%")
print(f"Traces repeated and in dataset: {old_repeated:0.3f}%")
new_repeated = 1 - old_repeated
print(f"New unseen and repeated traces: {new_repeated:0.3f}%")