<a href="https://colab.research.google.com/github/amcurley/MeetAbby/blob/master/transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [72]:
# Code inspired and adapted from https://keras.io/examples/generative/text_generation_with_miniature_gpt/

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
import numpy as np
import os
import re
import string
import random
import pandas as pd

In [82]:
# https://arxiv.org/abs/1706.03762

"""
Self-attention with causal masking
We compute self-attention as usual, but prevent any information to flow from future 
tokens by masking the upper half of the scaled dot product matrix.
"""
class MultiHeadSelfAttention(layers.Layer):  # Attention over the input sentence bottom left

  def __init__(self, embed_dim, num_heads=8): # 8 is the parallel attention layers
    super(MultiHeadSelfAttention, self).__init__()
    self.embed_dim = embed_dim
    self.num_heads = num_heads

    if embed_dim % num_heads != 0:
      raise ValueError(f"ebmedding dimension = {embed_dim} should be divisible by the number of heads = {num_heads}")
  
    self.projection_dim = embed_dim // num_heads #floored division
    self.query_dense = layers.Dense(embed_dim)
    self.key_dense = layers.Dense(embed_dim)
    self.value_dense = layers.Dense(embed_dim)
    self.combine_heads = layers.Dense(embed_dim)

  @staticmethod # Method does not utilize self
  # Right side Masked Multi-Head Attention
  def causal_attention_mask(n_dest, n_src, dtype):
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    return tf.cast(m, dtype)

  def attention(self, query, key, value):
    score = tf.matmul(query, key, transpose_b=True) # Matrix Multiplcation for matrix a (query) and b (key)
    dim_key = tf.cast(tf.shape(key)[-1], tf.float32) # Changes the dtype to a float32
    scaled_score = score / tf.math.sqrt(dim_key) # Computes element-wise square root of the input tensor.

    # prevent information flow from future tokens?
    shape = tf.shape(scaled_score) # Shape of tensor
    dim_dest, dim_src = shape[2], shape[3]
    attention_mask = self.causal_attention_mask(dim_dest, dim_src, scaled_score.dtype)
    attention_mask = tf.reshape(attention_mask, [1, 1, dim_dest, dim_src])
    scaled_score = scaled_score * attention_mask - 1e4 * (1 - attention_mask)

    weights = tf.nn.softmax(scaled_score, axis=-1) # Computes softmax activations
    output = tf.matmul(weights, value)

    return output, weights

  def separate_heads(self, x, batch_size):
    x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
    return tf.transpose(x, perm=[0, 2, 1, 3])

  def call(self, inputs):
    batch_size = tf.shape(inputs)[0] 
    query = self.query_dense(inputs) # (batch_size, seq_len, embed_dim)
    key = self.key_dense(inputs)
    value = self.value_dense(inputs)
    query = self.separate_heads(
        query, batch_size
    )
    key = self.separate_heads(
        key, batch_size
    )
    value = self.separate_heads(
        value, batch_size
    )
    attention, weights = self.attention(query, key, value)
    attention = tf.transpose(
        attention, perm=[0, 2, 1, 3]
    )
    concat_attention = tf.reshape(
        attention, (batch_size, -1, self.embed_dim)
    )
    output = self.combine_heads(concat_attention)

    return output

In [83]:
# Implement a Transformer block as a

class TransformerBlock(layers.Layer):
  def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):

    super(TransformerBlock, self).__init__()
    self.att = MultiHeadSelfAttention(embed_dim, num_heads)
    self.ffn = keras.Sequential(
        [layers.Dense(ff_dim, activation='relu'), layers.Dense(embed_dim),] # Feedforward Neural Network
    )
    #Normalize the activations of the previous layer for each given example in a batch independently, 
    #rather than across a batch like Batch Normalization.
    self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
    self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
    self.dropout1 = layers.Dropout(rate)
    self.dropout2 = layers.Dropout(rate)

  def call(self, inputs):
    attention_output = self.att(inputs)
    attention_output = self.dropout1(attention_output)
    out1 = self.layernorm1(inputs + attention_output)
    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output)

    return self.layernorm2(out1 + ffn_output)

In [84]:
# Implement Embedding Layer
# Two layers --> one for tokens, one for token index

class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [85]:
# Implement miniature GPT model

vocab_size = 20_000 # The top 20k words
maxlen = 100 # Max sequence length
embed_dim = 256 # Embedding size for each token
num_heads = 2 # Number of attention heads
feed_forward_dim = 256 # Hidden layer size in feed forward neural network in transformer

def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    return model

In [6]:
# Bring in data
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 80.2M  100 80.2M    0     0  5455k      0  0:00:15  0:00:15 --:--:-- 14.4M


In [86]:
# This will be my data
batch_size = 32

filenames = []
directories = [
    "aclImdb/train/pos",
    "aclImdb/train/neg",
    "aclImdb/test/pos",
    "aclImdb/test/neg",
]
for dir in directories:
    for f in os.listdir(dir):
        filenames.append(os.path.join(dir, f))

print(f"{len(filenames)} files")

50000 files


In [87]:
# Create a dataset from these text files
random.shuffle(filenames)
text_ds = tf.data.TextLineDataset(filenames)
text_ds = text_ds.shuffle(buffer_size=256)
text_ds = text_ds.batch(batch_size)

def custom_standardization(input_string):
  # Remove html line-break tags, and handle puncutation
  lowercased = tf.strings.lower(input_string)
  stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")

  return tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")

# Create vectorization layer and adapy it to the text
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size -1,
    output_mode='int',
    output_sequence_length=maxlen + 1, 
)
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary() # Get words back from token index

def prepare_lm_input_labels(text):
  """
  Shift word sequences by 1 position so that the target for position (i) is
  word at position (i+1). The model will use all words up till position (i)
  to predict the next word.
  """
  text = tf.expand_dims(text, -1) # Returns a tensor with a length 1 axis inserted at index axis
  tokenized_sentences = vectorize_layer(text)
  x = tokenized_sentences[:, :-1]
  y = tokenized_sentences[:, 1:]
  return x, y

text_ds = text_ds.map(prepare_lm_input_labels)
text_ds = text_ds.prefetch(tf.data.experimental.AUTOTUNE)


In [113]:
# Callback for generating text

class TextGenerator(keras.callbacks.Callback):
  """
  Callback to generate text from trained model!
  1. Feed some starting prompt to the model (tweet)
  2. Predict probabilites for next token
  3. Sample next token and add it to the next input

  # Arguments
    max_tokens: Integer, the number of tokens to be generated after prompt.
    start_tokens: List of integers, the token indices for the starting prompt.
    index_to_word: List of strings, obtained from TextVectorization layer.
    top_k: Integer, sample from the `top_k` token predictions.
    print_every: Integer, print after this many epochs.
  """

  def __init__(self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
    self.max_tokens = max_tokens
    self.start_tokens = start_tokens
    self.index_to_word = index_to_word
    self.print_every = print_every
    self.k = top_k

  def sample_from(self, logits):
  #Finds values and indices of the k largest entries for the last dimension
    logits, indices = tf.math.top_k(logits, k=self.k, sorted=True,) 

    indices - np.asarray(indices).astype("int32")
    preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
    preds = np.asarray(preds).astype("float32")
    return np.random.choice(indices, p=preds)

  def detokenize(self, number):
    return self.index_to_word[number]

    

  def on_epoch_end(self, epoch, logs=None):
    start_tokens = [_ for _ in self.start_tokens]
    if (epoch +1) % self.print_every !=0:
      return

    num_tokens_generated = 0
    tokens_generated = []

    while num_tokens_generated <= self.max_tokens:
      pad_len = maxlen - len(start_tokens)
      sample_index = len(start_tokens) - 1

      if pad_len < 0:
        x = start_tokens[:maxlen]
        sample_index = maxlen - 1

      elif pad_len > 0:
        x = start_tokens + [0] * pad_len

      else:
        x = start_tokens

      x = np.array([x])
      y, _ = self.model.predict(x)
      sample_token = self.sample_from(y[0][sample_index])
      tokens_generated.append(sample_token)
      start_tokens.append(sample_token)
      num_tokens_generated = len(tokens_generated)

    # THIS WILL BE THE RESPONSE TO THE TWEET
    txt = " ".join(
        [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
    )
    print(f"generated text:\n{txt}\n")

    return pd.DataFrame({'response', txt})

  def to_dataframe(self, txt):
    txt = on_epoch_end(self, epoch, logs=None)
    return txt


# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
  word_to_index[word] = index

start_prompt = "this movie is  "
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 20
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [114]:
# Train the Model!

model = create_model()

model.fit(text_ds, verbose=2, epochs=2, callbacks=[text_gen_callback])

Epoch 1/2
generated text:
this movie is about a beautiful film and a beautiful young woman who is not in a way , she is not only to

1563/1563 - 75s - loss: 5.0492 - dense_82_loss: 5.0492
Epoch 2/2
generated text:
this movie is a movie i can relate to this film . the plot is about how it is possible to be made .

1563/1563 - 75s - loss: 4.4816 - dense_82_loss: 4.4816


<tensorflow.python.keras.callbacks.History at 0x7f674fa2c828>

In [118]:
print(TextGenerator.to_dataframe)

<function TextGenerator.to_dataframe at 0x7f67f035c268>


In [117]:
df

<function __main__.TextGenerator.to_dataframe>