Link to code: https://keras.io/examples/nlp/text_classification_with_transformer/

In [3]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
!pip install num2words
import pandas as pd
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from collections import Counter
import numpy as np
import math
from sklearn.metrics import confusion_matrix,accuracy_score,classification_report
from sklearn.model_selection import train_test_split



In [4]:
df = pd.read_csv('covid_tweets.csv')
democrat_tweets_df = df[df.party == 'Democrat']
republican_tweets_df = df[df.party == 'Republican']
democrat_tweets_df = democrat_tweets_df.sample(frac = 0.5)
frames = [democrat_tweets_df, republican_tweets_df]
df = pd.concat(frames)

In [5]:
from nltk.tokenize import word_tokenize
import re
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
covid_stop_words = {"covid", "covid19", "covid 19", "corona", "coronavirus"}
porter = PorterStemmer()
def preprocess_and_tokenize(tweet):
    # Remove non-letters, lowercase everything, remove stop words, and stem
    lower_letters = re.sub(r'[^A-Za-z0-9 ]+', " ", tweet).lower().split()
    important_words = []
    for w in lower_letters:
        if w not in stop_words and w not in covid_stop_words:
            important_words.append(w)
    return important_words

In [6]:
df['tokenize_tweet'] = df['tweet'].apply(lambda x : preprocess_and_tokenize(x))

In [7]:
dems, reps = list(), list()
for l in df.party:
    if l == 'Democrat':
        dems.append(1)
        reps.append(0)
    elif l == 'Republican':
        reps.append(1)
        dems.append(0)
df['dems']= dems
df['reps']= reps

### Implement multi head self attention as a Keras layer

In [8]:
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(
            query, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        key = self.separate_heads(
            key, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(
            value, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(
            attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(
            concat_attention
        )  # (batch_size, seq_len, embed_dim)
        return output

### Implement a Transformer block as a layer

In [9]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

### Implement embedding layer
Two seperate embedding layers, one for tokens, one for token index (positions).

In [10]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [28]:
data_train, data_test = train_test_split(df, test_size=0.2, random_state=42)

In [29]:
all_training_words = [word for tokens in data_train["tokenize_tweet"] for word in tokens]
training_sentence_lengths = [len(tokens) for tokens in data_train["tokenize_tweet"]]
TRAINING_VOCAB = sorted(list(set(all_training_words)))
print("%s words total, with a vocabulary size of %s" % (len(all_training_words), len(TRAINING_VOCAB)))
print("Max sentence length is %s" % max(training_sentence_lengths))

741396 words total, with a vocabulary size of 33855
Max sentence length is 91


In [30]:
all_test_words = [word for tokens in data_test["tokenize_tweet"] for word in tokens]
test_sentence_lengths = [len(tokens) for tokens in data_test["tokenize_tweet"]]
TEST_VOCAB = sorted(list(set(all_test_words)))
print("%s words total, with a vocabulary size of %s" % (len(all_test_words), len(TEST_VOCAB)))
print("Max sentence length is %s" % max(test_sentence_lengths))

185460 words total, with a vocabulary size of 17132
Max sentence length is 56


In [32]:
MAX_SEQUENCE_LENGTH = 50
EMBEDDING_DIM = 300

tokenizer = Tokenizer(num_words=len(TRAINING_VOCAB), lower=True, char_level=False)
tokenizer.fit_on_texts(data_train["tweet"].tolist())
training_sequences = tokenizer.texts_to_sequences(data_train["tweet"].tolist())
train_word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(train_word_index))
train_rnn_data = keras.preprocessing.sequence.pad_sequences(training_sequences, maxlen=MAX_SEQUENCE_LENGTH)

Found 37922 unique tokens.


In [33]:
test_sequences = tokenizer.texts_to_sequences(data_test["tweet"].tolist())
test_rnn_data = keras.preprocessing.sequence.pad_sequences(test_sequences, maxlen=MAX_SEQUENCE_LENGTH)

In [35]:
vocab_size = sorted(list(set(all_test_words + all_training_words)))

In [36]:
# Create classifier model using transformer layer
# Transformer layer outputs one vector for each time step of our input sequence. Here, we take the mean across all time steps and use a feed forward network on top of it to classify text.

embed_dim = 32  # Embedding size for each token
num_heads = 2  # Number of attention heads
ff_dim = 32  # Hidden layer size in feed forward network inside transformer

inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, len(TRAINING_VOCAB), embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

In [42]:
label_names = ['dems', 'reps']
y_train = data_train[label_names].values
x_train = train_rnn_data
y_tr = y_train

In [43]:
# Train and Evaluate
model.compile("adam", "binary_crossentropy", metrics=["accuracy"])
history = model.fit(
    x_train, y_train, batch_size=32, epochs=5
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [44]:
predictions = model.predict(test_rnn_data, batch_size=512, verbose=1)



In [45]:
labels = [1, 0]
prediction_labels=[]
for p in predictions:
    prediction_labels.append(labels[np.argmax(p)])
sum(data_test['dems'] == prediction_labels)/len(prediction_labels)

0.8260053933638175