In [1]:
import tensorflow as tf   
from tensorflow import keras 
from keras import layers
from keras.layers import TextVectorization
import numpy as np
from sklearn.model_selection import train_test_split

In [None]:
"""

The purpose of self-attention is to modulate the representation of a token
by using the representations of related tokens in the sequence. This produces 
context aware token representations.

"""

In [2]:
!wget https://storage.googleapis.com/tensorflow-1-public/course3/sarcasm.json

--2023-04-05 16:32:55--  https://storage.googleapis.com/tensorflow-1-public/course3/sarcasm.json
Resolving storage.googleapis.com (storage.googleapis.com)... 172.253.114.128, 108.177.111.128, 142.250.1.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.253.114.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5643545 (5.4M) [application/json]
Saving to: ‘sarcasm.json’


2023-04-05 16:32:55 (60.0 MB/s) - ‘sarcasm.json’ saved [5643545/5643545]



In [35]:
import json 

with open('sarcasm.json', 'r') as f :
  datastore = json.load(f)

  labels = []
sentences = []

for item in datastore:
  sentences.append(item['headline'])
  labels.append(item['is_sarcastic'])

print("sentences len",  len(sentences))
print("labels len", len(labels))
print("sentences[0]", sentences[0])  
print("labels[0]", labels[0])

sentences len 26709
labels len 26709
sentences[0] former versace store clerk sues over secret 'black code' for minority shoppers
labels[0] 0


In [36]:
sentences = np.array(sentences)
labels = np.array(labels)

In [24]:
def make_dataset(features , labels):
  ds = tf.data.Dataset.from_tensor_slices((features, labels))
  ds = ds.shuffle(buffer_size=len(features))
  ds = ds.batch(batch_size=32)
  return ds 

In [37]:
train_sentences , test_sentences , train_labels , test_labels = train_test_split(sentences, labels, test_size=0.2, random_state=42 )
train_sentences , val_sentences , train_labels, val_labels = train_test_split(train_sentences , train_labels , test_size=0.25, random_state=42)

In [38]:
train_ds = make_dataset(train_sentences , train_labels)
val_ds = make_dataset(val_sentences , val_labels)
test_ds = make_dataset(test_sentences , test_labels)

In [27]:
text_only_train_ds = train_ds.map(lambda x, y: x)


In [28]:
# Preparing integer sequence datasets
max_length = 250
max_tokens = 10000

text_vectorization = TextVectorization(
    max_tokens = max_tokens ,
    output_mode = "int",
    output_sequence_length = max_length,
)

In [29]:
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(
lambda x, y: (text_vectorization(x), y),
num_parallel_calls=4)

int_val_ds = val_ds.map(
lambda x, y: (text_vectorization(x), y),
num_parallel_calls=4)

int_test_ds = test_ds.map(
lambda x, y: (text_vectorization(x), y),
num_parallel_calls=4)

In [5]:
# Transformer encoder implemented as a subclassed Layer


In [30]:
class TransformerEncoder(layers.Layer):
  def __init__(self , embed_dim , dense_dim , num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads 
    self.attention = layers.MultiHeadAttention(num_heads = num_heads ,
                                               key_dim = embed_dim)
    self.dense_proj = keras.Sequential([
        layers.Dense(dense_dim,activation = "relu"),
        layers.Dense(embed_dim,)
    ])
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()

  def call(self, inputs , mask=None):
    if mask is not None :
      mask = mask[:, tf.newaxis, :]
    attention_output = self.attention(inputs , 
                                      inputs,
                                      attention_mask = mask)
    proj_input = self.layernorm_1(inputs + attention_output)
    proj_output = self.dense_proj(proj_input)
    return self.layernorm_2(proj_input + proj_output)

  def get_config(self):
    config = super().get_config()
    config.update({
        "embed_dim" : self.embed_dim ,
        "num_heads" : self.num_heads ,
        "dense_dim" : self.dense_dim,
    })
    return config 


In [12]:
# Using the Transformer encoder for text classification

vocab_size = 10000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = layers.Embedding(vocab_size , embed_dim)(inputs)
x = TransformerEncoder(embed_dim , dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs , outputs )
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics = ["accuracy"])

model.summary()



Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None)]            0         
                                                                 
 embedding (Embedding)       (None, None, 256)         2560000   
                                                                 
 transformer_encoder (Transf  (None, None, 256)        543776    
 ormerEncoder)                                                   
                                                                 
 global_max_pooling1d (Globa  (None, 256)              0         
 lMaxPooling1D)                                                  
                                                                 
 dropout (Dropout)           (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 1)                 257   

In [13]:
tf.keras.backend.clear_session()

callbacks = [keras.callbacks.ModelCheckpoint("transformer_encoder.keras",save_best_only=True)]

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20,callbacks=callbacks)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f57044bd220>

In [14]:
model = keras.models.load_model("transformer_encoder.keras",
                                custom_objects={"TransformerEncoder": TransformerEncoder})

print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Test acc: 0.843


In [31]:
class PositionalEmbedding(layers.Layer):
  def __init__(self, sequence_length , input_dim , output_dim , **kwargs):
    super().__init__(**kwargs)
    self.token_embeddings = layers.Embedding(input_dim=input_dim ,
                                             output_dim = output_dim)
    self.position_embeddings = layers.Embedding(input_dim = sequence_length,
                                                output_dim=output_dim)
    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim 

  def call( self, inputs) :
    length = tf.shape(inputs)[-1]
    positions = tf.range(start=0, limit=length, delta=1)
    embedded_tokens = self.token_embeddings(inputs)
    embedded_positions = self.position_embeddings(positions)
    return embedded_tokens + embedded_positions

  def conmpute_mask(self, inputs , mask=None):
    return tf.math.not_equal(inputs, 0)

  def get_config(self):
    config = super().get_config()
    config.update({
        "output_dim" : self.output_dim ,
        "sequence_length" :  self.sequence_length,
        "input_dim" : self.input_dim
    })
    return config 

In [39]:
# Combining the Transformer encoder with positional embedding
tf.keras.backend.clear_session()
vocab_size = 10000
sequence_length = 300
embed_dim = 256
num_heads = 2
dense_dim = 32


inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)

model.compile(optimizer="rmsprop",loss="binary_crossentropy",metrics=["accuracy"])
model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, None)]            0         
                                                                 
 positional_embedding (Posit  (None, None, 256)        2636800   
 ionalEmbedding)                                                 
                                                                 
 transformer_encoder (Transf  (None, None, 256)        543776    
 ormerEncoder)                                                   
                                                                 
 global_max_pooling1d (Globa  (None, 256)              0         
 lMaxPooling1D)                                                  
                                                                 
 dropout (Dropout)           (None, 256)               0         
                                                             

In [40]:
callbacks = [keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras",save_best_only=True)]

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20,callbacks=callbacks)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f567dde72e0>

In [41]:
model = keras.models.load_model("full_transformer_encoder.keras",custom_objects={"TransformerEncoder": TransformerEncoder,"PositionalEmbedding": PositionalEmbedding})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Test acc: 0.853
