#  **Introduction**

> Recently I've been reading "**Attention Is All You Need**" paper aka **Tranformer by Google in 2017**. Later on, **Andrej Karpathy** had explained this paper on simple understandable chuncks. Also I've recently switch to pytorch from tensorflow. So, this notebook is of a begineer trying to implement what he has been learning recently. Pytorch has a inbuilt Transformer on `nn.transformer` but still I've tried my best to implement the decoder only transformer architecture using basic pytorch and this notebook will be guide for someone like me. 

In [None]:
import pandas as pd 
import os
import re
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
from sklearn.metrics import accuracy_score

#  Directory Reading

# Data Reading
> As per the dataset description, each file has 4 column i.e. twitter Id, entity, sentiment and text. I've loaded the both training data and validation data for preprocessing.

In [None]:
columns_name = ["t_id", "entity", "sentiment", "text"]
training_data = pd.read_csv("dataset/twitter_training.csv", header=None, names=columns_name, index_col=False)
validation_data = pd.read_csv("dataset/twitter_validation.csv", header=None, names=columns_name, index_col=False)

Just tried to peek into the data.

In [None]:
training_data.head(), validation_data.head()

> from four columns, only **text** and **sentiment** will be used for the sentiment analysis. text will be the input features and sentiment will be the target. Also I'm printing the shape of trainin_data and validation_data to look into their shape before and after preprocessing

In [None]:
training_data = training_data[["text", "sentiment"]]
validation_data = validation_data[["text", "sentiment"]]
print(f"Shape of Training_data:{training_data.shape} | Shape of validation_data:{validation_data.shape}")

# Data Preprocessing

> This is a basic text cleaning preprocess. It convert text to lowercase, remove urls, remove hashtag for mentions, remove some punctuation and remove numbers. I think these are irrelevant for sentiment analysis. You can add as per your need.

In [None]:
def clean_tweet(text):
    if isinstance(text, str):
        text = text.lower()  # Convert to lowercase
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)  # Remove URLs
        text = re.sub(r'\@\w+|\#', '', text)  # Remove mentions and hashtags
        text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
        text = re.sub(r'\d+', '', text)  # Remove numbers
    else:
        text = ''  # Handle non-string inputs like float (NaN) by returning an empty string or handling as needed
    
    return text

In [None]:
training_data["text"] = training_data["text"].apply(clean_tweet)
validation_data["text"] = validation_data["text"].apply(clean_tweet)

> Also I've removed stopwords from the text as it is irrelevant to sentiment analysis.

In [None]:
stop_words = set(stopwords.words('english'))
print(f"Length of stopwords:{len(stop_words)}")

def remove_stopwords(text):
    return ' '.join([word for word in text.split() if word not in stop_words])

training_data["text"] = training_data["text"].apply(remove_stopwords)
validation_data["text"] = validation_data["text"].apply(remove_stopwords)
print(f" Shape of Training data: {training_data.shape} | Shape of Validation data: {validation_data.shape}")

> Removing duplicates is relevant to sentiment analysis. Look at the shape of data before and after removing the duplicates.

In [None]:
training_data.drop_duplicates(subset=["text"], inplace=True)
validation_data.drop_duplicates(subset=["text"], inplace=True)
print(f" Shape of Training data: {training_data.shape} | Shape of Validation data: {validation_data.shape}")

# Label encoding
I've used LabelEncoder from sklean. You can use other available modules.
There are four classes as Sentiment. They are **Positive, Negative, Neutral & Irrelevant**

In [None]:
label_encoder = LabelEncoder()

training_data["sentiment"] = label_encoder.fit_transform(training_data["sentiment"])
validation_data["sentiment"] = label_encoder.fit_transform(validation_data["sentiment"])
label_classes = label_encoder.classes_
num_classes = len(label_classes)
num_classes

# Custom Dataset from Pytorch
> For a cutom dataset, we should override three functions. They are `__init__`, `__len__` and `__getitem__`

In [None]:
class TwitterSentimentDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        text = self.data.iloc[index]["text"]
        sentiment = self.data.iloc[index]["sentiment"]
        
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length = self.max_length,
            padding = 'max_length',
            truncation = True,
            return_attention_mask = True,
            return_tensors = 'pt' 
        )
        label = torch.tensor(sentiment, dtype=torch.long)
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask':encoding['attention_mask'].flatten(),
            'labels':label
        }

# Tokenization
> I've used `BertTokenizer` from huggingface. You can use other as well. Like `tiktoken` (tokenizer of OpenAI), `Sentencepice` by google and many more. In near future I'll use custom tokenizer.

> Be careful about `max_length` and `batch_size`. `Max_length` is the maximum length of token to feed into tokenizer and `batch_size` control the number of data that a model interact in each pass per iteration. Having a low batch_size is time expensive and high batch_size is memory expensive and model can't learn the pattern in the data.

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
max_length = 128

dataset = TwitterSentimentDataset(
    training_data,
    tokenizer=tokenizer,
    max_length=max_length
)

batch_size = 512
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

> This is the vocabulary size aka vocab_size of the tokenizer. Tokenizer can use 30522 vocab_size (gpt-2 tokenizer had 50,257) to encode and decode text.

In [None]:
vocab_size = tokenizer.vocab_size
vocab_size

> These are the hyperparameters for the transformer.

In [None]:
n_embed = 32
block_size = 4
dropout = 0.1
n_head = 6
n_layer = 6
device = "cuda" if torch.cuda.is_available() else "cpu"
device

# Transformer Block -> Self Attention, Multihead Attention

In [None]:
class Self_Attention(nn.Module):
    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embed, head_size, bias=False)
        self.query = nn.Linear(n_embed, head_size, bias=False)
        self.value = nn.Linear(n_embed, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(max_length, max_length)))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B, T, C = x.shape
        k = self.key(x)
        q = self.query(x)
        v = self.value(x)

        wei = torch.matmul(q, k.transpose(-2, -1)) * (k.shape[-1]**-0.5)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))  #comment this line for encoder only transformer
        wei = nn.Softmax(dim=-1)(wei)
        wei = self.dropout(wei)
        out = torch.matmul(wei, v)
        return out

class Multi_Head_Attention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()
        self.heads = nn.ModuleList([Self_Attention(head_size) for _ in range(num_heads)])
        self.projection = nn.Linear(head_size * num_heads, n_embed)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([sa(x) for sa in self.heads], dim=-1)
        out = self.dropout(self.projection(out))
        return out

class Feed_Forward_Network(nn.Module):
    def __init__(self, n_embed):
        super().__init__()
        self.layer = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed),
            nn.ReLU(),
            nn.Linear(4 * n_embed, n_embed),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.layer(x)

class Block(nn.Module):
    def __init__(self, n_embed, n_head):
        super().__init__()
        head_size = n_embed // n_head
        self.self_attention = Multi_Head_Attention(n_head, head_size)
        self.feed_forward = Feed_Forward_Network(n_embed)
        self.layer_norm_1 = nn.LayerNorm(n_embed)
        self.layer_norm_2 = nn.LayerNorm(n_embed)

    def forward(self, x):
        x = x + self.self_attention(self.layer_norm_1(x))
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

# SentimentAnalysisTransformer

In [None]:
class SentimentAnalysisTransformer(nn.Module):
    def __init__(self):  
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
        self.positional_embedding_table = nn.Embedding(max_length, n_embed)
        self.block = nn.Sequential(*[Block(n_embed, n_head=n_head) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embed)
        self.lm_head = nn.Linear(n_embed, num_classes)
        
    def forward(self, x):
        B, T = x.shape
        tok_emb = self.token_embedding_table(x)  # (B, T, n_embed)
        pos_emb = self.positional_embedding_table(torch.arange(T, device=x.device))  # (T, n_embed)
        x = tok_emb + pos_emb  # (B, T, n_embed)
        x = self.block(x)  # (B, T, n_embed)
        x = self.ln_f(x)  # (B, T, n_embed)
        logits = self.lm_head(x.mean(dim=1))  # (B, T, vocab_size)
        return logits

In [None]:
model = SentimentAnalysisTransformer()
model.to(device)

# Loss function and Optimizer

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training

In [None]:
num_epochs= 60

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for batch in dataloader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        y_hat = model(input_ids)
        loss = loss_fn(y_hat, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()        
    epoch_loss = running_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")

# Validation Dataset and Dataloader

In [None]:
validation_dataset = TwitterSentimentDataset(validation_data, tokenizer=tokenizer, max_length=max_length)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)

# Prediction

In [None]:
model.eval()
all_pred = []
all_labels = []

with torch.inference_mode():
    for batch in validation_loader:
        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)

    outputs = model(input_ids)
    _, preds = torch.max(outputs, dim=1)

    all_pred.extend(preds.cpu().numpy())
    all_labels.extend(labels.cpu().numpy())

# Accuracy Calculation

In [None]:
accuracy = accuracy_score(all_labels, all_pred)
print(f"validation accuracy is:{accuracy}")