In [None]:
import pandas as pd
import numpy as np
import math

import matplotlib.pyplot as plt
import seaborn as sns

from wordcloud import WordCloud, STOPWORDS
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay

from transformers import BertTokenizer, BertForSequenceClassification
from transformers import AdamW, get_linear_schedule_with_warmup
import torch
from torch.utils.data import DataLoader, TensorDataset, RandomSampler, SequentialSampler
from torch.nn.utils import clip_grad_norm_
from tqdm.notebook import tqdm



In [None]:
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

#### loading data

In [None]:
data =pd.read_csv('sentiment.csv')

#### data exploration and preprocessing

In [None]:
data.columns

In [None]:
data.info()

In [None]:
data.head()

In [None]:
data = data[['text', 'sentiment']]

In [None]:
def preprocess(text):
    stopwords_set = set(stopwords.words("english"))
    stemmer = SnowballStemmer("english")
    
    words_filtered = [e.lower() for e in text.split() if len(e) >= 3]
    words_cleaned = [word for word in words_filtered if 'http' not in word
                     and not word.startswith('@')
                     and not word.startswith('#')
                     and word != 'RT']
    words_without_stopwords = [stemmer.stem(word) for word in words_cleaned if word not in stopwords_set]
    return ' '.join(words_without_stopwords)

In [None]:
def wordcloud_draw(data, color = 'black'):
    data=' '.join(data)
    wordcloud = WordCloud(stopwords=STOPWORDS,
                      background_color=color,
                      width=2500,
                      height=2000
                     ).generate(data)
    plt.figure(1,figsize=(13, 13))
    plt.imshow(wordcloud)
    plt.axis('off')
    plt.show()

In [None]:
data['text']=data['text'].apply(preprocess)

In [None]:
#word cloud of al tweets
wordcloud_draw(data['text'],'white')

In [None]:
# positive tweets
data_positive = data[ data['sentiment'] == 'Positive']
data_positive = data_positive['text']
wordcloud_draw(data_positive,'white')

In [None]:
# negative tweets
data_negative = data[ data['sentiment'] == 'Negative']
data_negative = data_negative['text']
wordcloud_draw(data_negative,'white')

In [None]:
#label distribution
data['sentiment'].value_counts().plot(kind='bar')

In [None]:
# Mapping from class names to numbers
class_names = ['Negative', 'Neutral', 'Positive']

mapping = {'Negative': 0, 'Neutral': 1, 'Positive': 2}
data['sentiment'] = data['sentiment'].replace(mapping)

#### training data preparation

In [None]:
#train, validation,  test split
train_df, test_df = train_test_split(data, test_size=0.3, random_state=42)
test_df, valid_df = train_test_split(test_df, test_size=0.3, random_state=42) 

print("train size:", len(train_df))
print("validation size:", len(valid_df))
print("test size:", len(test_df))

In [None]:
# BERT model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

pretrained_model = 'bert-base-cased'
tokenizer = BertTokenizer.from_pretrained(pretrained_model)

In [None]:
token_lens = []

for txt in data['text']:
    tokens = tokenizer.encode(txt, max_length=512)
    token_lens.append(len(tokens))
sns.distplot(token_lens)
plt.xlim([0, 120])
plt.xlabel('Token count')

In [None]:
tokenizer = BertTokenizer.from_pretrained(pretrained_model, do_lower_case=True)

def encode(docs):

    #  takes list of texts, returns input_ids and attention_masks
    
    encoded_dict = tokenizer.batch_encode_plus(docs, add_special_tokens=True, max_length=128, padding='max_length',
                            return_attention_mask=True, truncation=True, return_tensors='pt')
    input_ids = encoded_dict['input_ids']
    attention_masks = encoded_dict['attention_mask']
    return input_ids, attention_masks

train_input_ids, train_att_masks = encode(train_df['text'].values.tolist())
valid_input_ids, valid_att_masks = encode(valid_df['text'].values.tolist())
test_input_ids, test_att_masks = encode(test_df['text'].values.tolist())

In [None]:
# create pytorch data loader
train_y = torch.LongTensor(train_df['sentiment'].values.tolist())
valid_y = torch.LongTensor(valid_df['sentiment'].values.tolist())
test_y = torch.LongTensor(test_df['sentiment'].values.tolist())

train_y.size(),valid_y.size(),test_y.size()

In [None]:
batch_size= 16
train_dataset = TensorDataset(train_input_ids, train_att_masks, train_y)
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=batch_size)

valid_dataset = TensorDataset(valid_input_ids, valid_att_masks, valid_y)
valid_sampler = SequentialSampler(valid_dataset)
valid_dataloader = DataLoader(valid_dataset, sampler=valid_sampler, batch_size=batch_size)

test_dataset = TensorDataset(test_input_ids, test_att_masks, test_y)
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=batch_size)

#### BERT for sequence classification

In [None]:
num_labels = len(train_df['sentiment'].unique())
model = BertForSequenceClassification.from_pretrained(pretrained_model,
                                                      num_labels=num_labels,
                                                      output_attentions=False,
                                                      output_hidden_states=False)

In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

epochs = 3
lr = 2e-5

optimizer = AdamW(model.parameters(), lr=lr)
scheduler = get_linear_schedule_with_warmup(optimizer,  num_warmup_steps=0, num_training_steps=len(train_dataloader)*epochs )

In [None]:
train_loss_per_epoch = []
val_loss_per_epoch = []


for epoch in range(epochs):
    print(f'epoch: {epoch+1}')
    
    # training
    model.train()
    train_loss = 0
    for step_num, batch_data in enumerate(tqdm(train_dataloader,desc='training')):
        input_ids, att_mask, labels = [data.to(device) for data in batch_data]
        output = model(input_ids = input_ids, attention_mask=att_mask, labels= labels)
        
        loss = output.loss
        train_loss += loss.item()

        model.zero_grad()
        loss.backward()
        del loss

        clip_grad_norm_(parameters=model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

    train_loss_per_epoch.append(train_loss / (step_num + 1))              


    # Validation
    model.eval()
    valid_loss = 0
    valid_pred = []
    with torch.no_grad():
        for step_num_e, batch_data in enumerate(tqdm(valid_dataloader,desc='validation')):
            input_ids, att_mask, labels = [data.to(device) for data in batch_data]
            output = model(input_ids = input_ids, attention_mask=att_mask, labels= labels)

            loss = output.loss
            valid_loss += loss.item()
   
            valid_pred.append(np.argmax(output.logits.cpu().detach().numpy(),axis=-1))
        
    val_loss_per_epoch.append(valid_loss / (step_num_e + 1))
    valid_pred = np.concatenate(valid_pred)
    
    print(f"{step_num+1}/{math.ceil(len(train_df) / batch_size)} train loss: {train_loss / (step_num + 1)}")
    print(f"{step_num_e+1}/{math.ceil(len(test_df) / batch_size)} val loss: {valid_loss / (step_num_e + 1)}")
 

In [None]:
epochs = range(1, epochs +1 )
fig, ax = plt.subplots()
ax.plot(epochs,train_loss_per_epoch,label ='training loss')
ax.plot(epochs, val_loss_per_epoch, label = 'validation loss' )
ax.set_title('Training and Validation loss')
ax.set_xlabel('Epochs')
ax.set_ylabel('Loss')
ax.legend()

#### evaluation

In [None]:
print('classification report')
print(classification_report(valid_pred, valid_df['sentiment'].to_numpy(), target_names=class_names))

In [None]:
def plot_confusion_matrix(y_preds, y_true, labels=None):
  cm = confusion_matrix(y_true, y_preds, normalize="true")
  fig, ax = plt.subplots(figsize=(6, 6))
  disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels) 
  disp.plot(cmap="Blues", values_format=".2f", ax=ax, colorbar=False) 
  plt.title("Confusion matrix")
  plt.show()
  
plot_confusion_matrix(valid_pred,valid_df['sentiment'].to_numpy(),labels=class_names)

##### prediction on test set

In [None]:
model.eval()
test_pred = []
test_loss= 0
with torch.no_grad():
    for step_num, batch_data in tqdm(enumerate(test_dataloader)):
        input_ids, att_mask, labels = [data.to(device) for data in batch_data]
        output = model(input_ids = input_ids, attention_mask=att_mask, labels= labels)

        loss = output.loss
        test_loss += loss.item()
   
        test_pred.append(np.argmax(output.logits.cpu().detach().numpy(),axis=-1))
test_pred = np.concatenate(test_pred)

print(classification_report(test_pred, test_df['sentiment'].to_numpy(),target_names=class_names))
plot_confusion_matrix(test_pred,test_df['sentiment'].to_numpy(),labels=class_names)

##### error analysis


In [None]:
test_df['pred'] = test_pred
test_df.reset_index(level=0)
print(test_df[test_df['sentiment']!=test_df['pred']].shape)
test_df[test_df['sentiment']!=test_df['pred']][['text','sentiment','pred']].head(5)