<a href="https://colab.research.google.com/github/i3ehdad/BERTweet-Classifier/blob/main/BERTweet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#BERTweet fine-tuning on SemEval dataset

In [None]:
!pip3 install emoji
!pip install transformers sentencepiece

In [None]:
from transformers import AutoModel, AutoTokenizer 

bertweet = AutoModel.from_pretrained("vinai/bertweet-base")
tokenizer = AutoTokenizer.from_pretrained("vinai/bertweet-base", normalization = True, use_fast=False)

In [None]:
def writeResultsInFile(accuracy, f1_score, random_seed):
    f = open("log", "a")
    f.write(f'Random seed : {random_seed}, accuracy : {accuracy}, f1-score : {f1_score}')
    f.close()

In [None]:
import transformers
import torch
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, recall_score, f1_score, accuracy_score, average_precision_score
from collections import defaultdict
from textwrap import wrap
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
%matplotlib inline

RANDOM_SEED = 70
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
class SATweetDataset(Dataset):

  def __init__(self, tweets, targets, tokenizer, max_len):
    self.tweets = tweets
    self.targets = targets
    self.tokenizer = tokenizer
    self.max_len = max_len
  
  def __len__(self):
    return len(self.tweets)
  
  def __getitem__(self, item):
    tweet = str(self.tweets[item])
    target = self.targets[item]

    encoding = self.tokenizer.encode_plus(
      tweet,
      add_special_tokens=True,
      max_length=self.max_len,
      truncation= True,
      return_token_type_ids=False,
      padding = 'max_length',
      return_attention_mask=True,
      return_tensors='pt',
    )

    return {
      'tweet_text': tweet,
      'input_ids': encoding['input_ids'].flatten(),
      'attention_mask': encoding['attention_mask'].flatten(),
      'targets': torch.tensor(target, dtype=torch.long)
    }

In [None]:
def create_data_loader(df, tokenizer, max_len, batch_size):

    ds = SATweetDataset(
        tweets=df.body.to_numpy(),
        targets=df.target.to_numpy(),
        tokenizer=tokenizer,
        max_len=max_len
    )
    return DataLoader(
        ds,
        batch_size=batch_size,
        num_workers=4
    )

In [None]:
PRE_TRAINED_MODEL_NAME = "vinai/bertweet-base"

In [None]:
class SentimentClassifier(nn.Module):

  def __init__(self, n_classes):
    super(SentimentClassifier, self).__init__()
    self.bert = bertweet.from_pretrained(PRE_TRAINED_MODEL_NAME)
    #self.drop = nn.Dropout(p = 0.33)
    
    self.fc = nn.Linear(self.bert.config.hidden_size, 192)
    
    self.relu =  nn.LeakyReLU()

    self.out = nn.Linear(192, 1)
    
    self.tanh = nn.Tanh()

  
  def forward(self, input_ids, attention_mask):
    _, pooled_output = self.bert(
      input_ids=input_ids,
      attention_mask=attention_mask,
      return_dict=False
    )
    #output = self.drop(pooled_output)
    output = pooled_output
    
    output = self.fc(output)

    output = self.relu(output)

    return self.tanh(self.out(output)).flatten()

In [None]:
convert_to_label = np.vectorize(lambda x: -1 if x < -0.5 else (1 if x > 0.5 else 0))

In [None]:
def train_epoch(model, data_loader, loss_fn, optimizer,  device, n_examples):
  model = model.train()
  losses = []
  correct_predictions = 0
  
  for d in data_loader:
    input_ids = d["input_ids"].to(device)
    attention_mask = d["attention_mask"].to(device)
    targets = d["targets"].to(device)

    outputs = model(
      input_ids=input_ids,
      attention_mask=attention_mask
    )

    preds = convert_to_label(outputs.cpu().detach().numpy())
    loss = loss_fn(outputs, targets)
    correct_predictions += np.sum(preds == targets.cpu().numpy())
    losses.append(loss.item())

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

  return correct_predictions / n_examples, np.mean(losses)

In [None]:
def eval_model(model, data_loader, loss_fn, device, n_examples):
  model = model.eval()

  losses = []
  correct_predictions = 0

  with torch.no_grad():
    for d in data_loader:
      input_ids = d["input_ids"].to(device)
      attention_mask = d["attention_mask"].to(device)
      targets = d["targets"].to(device)

      outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask
      )
      preds = convert_to_label(outputs.cpu().detach().numpy())

      loss = loss_fn(outputs, targets)

      correct_predictions += np.sum(preds == targets.cpu().numpy())
      losses.append(loss.item())

  return correct_predictions / n_examples, np.mean(losses)

In [None]:
def get_predictions(model, data_loader):
  model = model.eval()
  
  tweets_content = []
  predictions = []
  prediction_probs = []
  real_values = []

  with torch.no_grad():
    for d in data_loader:

      texts = d["tweet_text"]
      input_ids = d["input_ids"].to(device)
      attention_mask = d["attention_mask"].to(device)
      targets = d["targets"].to(device)

      outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask
      )
      
      preds = convert_to_label(outputs.cpu().detach().numpy())
      
      probs = outputs

      tweets_content.extend(texts)
      predictions.extend(preds)
      prediction_probs.extend(probs)
      real_values.extend(targets)

  predictions = np.array(predictions)
  prediction_probs = torch.stack(prediction_probs).cpu()
  real_values = torch.stack(real_values).cpu()
  return tweets_content, predictions, prediction_probs, real_values

In [None]:
def show_confusion_matrix(confusion_matrix):
  hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
  hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
  hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
  plt.ylabel('True sentiment')
  plt.xlabel('Predicted sentiment');

In [None]:
def avg_rec(y_test, y_pred):
    rec_n, rec_u, rec_p = recall_score(y_test, y_pred, average=None)
    return (1/3) * (rec_n+ rec_u+ rec_p)

In [None]:
def f1_np(y_test, y_pred):
    f1_n, _,f1_p = f1_score(y_test, y_pred, average=None)
    return 0.5*(f1_n+f1_p)

In [None]:
import os
os.chdir('/content/drive/MyDrive/data/Extract-data')
df = pd.read_csv('df.csv')
df_test = pd.read_csv('labelled.csv')

In [None]:
df_train, df_val = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED)

In [None]:
df_test.Sentiment.value_counts()

In [None]:
def sentiment_encode(Sentiment):
  if Sentiment == 'Negative ':
    return -1
  elif Sentiment == 'Neutral ':
    return 0
  else: 
    return 1

In [None]:
df_train['target'] = df_train.Sentiment.apply(sentiment_encode)
df_val['target'] = df_val.Sentiment.apply(sentiment_encode)
df_test['target'] = df_test.Sentiment.apply(sentiment_encode)

In [None]:
df_test['target'] = df_test.Sentiment.apply(sentiment_encode)
class_names = ['Negative', 'Neutral', 'Positive']

BATCH_SIZE = 32 
MAX_LEN = 104

In [None]:
train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)

In [None]:
data = next(iter(train_data_loader))
data.keys()

In [None]:
model = SentimentClassifier(len(class_names))
model = model.to(device)

In [None]:
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)

print(input_ids.shape) # batch size x seq length
print(attention_mask.shape) # batch size x seq length

In [None]:
# Hyperparameters 

EPOCHS = 25

optimizer = optim.AdamW(model.parameters(), lr=1e-6)
total_steps = len(train_data_loader) * EPOCHS

scheduler = transformers.get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)

loss_fn = nn.L1Loss().to(device)

In [None]:
# Initiate the training loop 

%%time

history = defaultdict(list)
best_accuracy = 0
early_stop = 0

for epoch in range(EPOCHS):

  print(f'Epoch {epoch + 1}/{EPOCHS}')
  print('-' * 10)

  train_acc, train_loss = train_epoch(
    model,
    train_data_loader,    
    loss_fn, 
    optimizer, 
    device, 
    len(df_train)
  )

  print(f'Train loss {train_loss} accuracy {train_acc}')

  val_acc, val_loss = eval_model(
    model,
    val_data_loader,
    loss_fn, 
    device, 
    len(df_val)
  )

  print(f'Val   loss {val_loss} accuracy {val_acc}')
  print()

  history['train_acc'].append(train_acc)
  history['train_loss'].append(train_loss)
  history['val_acc'].append(val_acc)
  history['val_loss'].append(val_loss)

  torch.save(model.state_dict(), 'Bertweet_model_state5.bin')





  #if val_acc > best_accuracy:
   # torch.save(model.state_dict(), 'Bertweet_model_state1.bin')
   # best_accuracy = val_acc
    #early_stop = 0
  #else:
     # early_stop = early_stop + 1
     # if early_stop == 10:
       # break

In [None]:
# Load model parameters
model.load_state_dict(torch.load('Bertweet_model_state5.bin'))

In [None]:
test_acc, _ = eval_model(
  model,
  test_data_loader,
  loss_fn,
  device,
  len(df_test)
)

test_acc.item()

In [None]:
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE)

In [None]:
y_tweet_texts, y_pred, y_pred_probs, y_test = get_predictions(
  model,
  test_data_loader
)

In [None]:
# Classification Report
print(classification_report(y_test, y_pred, target_names=class_names))