In [None]:
! pip install -U accelerate
! pip install -U transformers

In [None]:
!pip install transformers datasets

In [None]:
import torch, os
import pandas as pd
from transformers import pipeline, BertForSequenceClassification, BertTokenizerFast
from torch.utils.data import Dataset
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
device

In [None]:
path="/content/drive/MyDrive/Dataset/tweets.csv"
#df_org= pd.read_csv("Sentiment Analysis Dataset.csv", encoding='ISO-8859-1')
df_org=pd.read_csv(path)
#df_org = df_org.sample(frac=1.0, random_state=42)

df_org.head()

In [None]:
labels = df_org['sentiment'].unique().tolist()
labels

In [None]:
NUM_LABELS= len(labels)

id2label={id:label for id,label in enumerate(labels)}

label2id={label:id for id,label in enumerate(labels)}

label2id

In [None]:
df_org["labels"]=df_org.sentiment.map(lambda x: label2id[x.strip()])
df_org.head()


In [None]:
df_org.sentiment.value_counts().plot(kind='pie', figsize=(5,5))

In [None]:
from transformers import AutoModel, AutoTokenizer, DistilBertTokenizer, DistilBertModel, AutoModelForSequenceClassification
import torch.nn as nn
from transformers.modeling_outputs import TokenClassifierOutput
fchidden = 256
hiddendim_lstm = 256
embeddim = 768
numlayers = 5
checkpoint='roberta-base'

class MyTaskSpecificCustomModel(nn.Module):
    """
    A task-specific custom transformer model. This model loads a pre-trained transformer model and adds a new dropout
    and linear layer at the end for fine-tuning and prediction on specific tasks.
    """
    def __init__(self, checkpoint, num_labels ):
        """
        Args:
            checkpoint (str): The name of the pre-trained model or path to the model weights.
            num_labels (int): The number of output labels in the final classification layer.
        """
        super(MyTaskSpecificCustomModel, self).__init__()
        self.num_labels = num_labels

        self.model = model = AutoModel.from_pretrained(checkpoint, config = AutoConfig.from_pretrained(checkpoint,
                                                                                                       output_attention = True,
                                                                                                       output_hidden_state = True ) )
        # New Layer
        self.dropout = nn.Dropout(0.1)
        #self.lstm=nn.LSTM(768,hiddendim_lstm,batch_first=True)
        self.classifier = nn.Linear(768, self.num_labels )

    def forward(self, input_ids = None, attention_mask=None, labels = None ):
        """
        Forward pass for the model.

        Args:
            input_ids (torch.Tensor, optional): Tensor of input IDs. Defaults to None.
            attention_mask (torch.Tensor, optional): Tensor for attention masks. Defaults to None.
            labels (torch.Tensor, optional): Tensor for labels. Defaults to None.

        Returns:
            TokenClassifierOutput: A named tuple with the following fields:
            - loss (torch.FloatTensor of shape (1,), optional, returned when label_ids is provided) – Classification loss.
            - logits (torch.FloatTensor of shape (batch_size, num_labels)) – Classification scores before SoftMax.
            - hidden_states (tuple(torch.FloatTensor), optional, returned when output_hidden_states=True is passed or when config.output_hidden_states=True) – Tuple of torch.FloatTensor (one for the output of the embeddings + one for the output of each layer) of shape (batch_size, sequence_length, hidden_size).
            - attentions (tuple(torch.FloatTensor), optional, returned when output_attentions=True is passed or when config.output_attentions=True) – Tuple of torch.FloatTensor (one for each layer) of shape (batch_size, num_heads, sequence_length, sequence_length).
        """
        outputs = self.model(input_ids = input_ids, attention_mask = attention_mask  )

        last_hidden_state = outputs[0]

        sequence_outputs = self.dropout(last_hidden_state)

        logits = self.classifier(sequence_outputs[:, 0, : ].view(-1, 768 ))

        loss = None
        loss = None
        if labels is not None:
            loss_func = nn.CrossEntropyLoss()
            loss = loss_func(logits.view(-1, self.num_labels), labels.view(-1))

            return TokenClassifierOutput(loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions)


class Bert_LSTM(nn.Module):
    def __init__(self, checkpoint, num_labels):
        super(Bert_LSTM, self).__init__()
        self.numclasses = num_labels
        self.embeddim = embeddim
        self.numlayers = numlayers
        self.hiddendim_lstm = hiddendim_lstm

        self.model= model = BertModel.from_pretrained(checkpoint, output_hidden_states=True, output_attentions=False)
        print("BERT Model Loaded")

        #self.dropout = nn.Dropout(0.1)
        self.lstm = nn.LSTM(self.embeddim, self.hiddendim_lstm, batch_first=True, bidirectional=True) # noqa
        #self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(self.embeddim, self.numclasses)
        #self.classifier1 = nn.Linear(256, self.numclasses)

    #def forward(self, inp_ids, att_mask, token_ids):
    def forward(self, input_ids = None, attention_mask=None, labels = None ):

        outputs = self.model(input_ids = input_ids, attention_mask = attention_mask)
        sequence_outputs=outputs[0]

        #sequence_outputs = self.dropout(sequence_outputs)
        sequence_outputs = self.lstm(sequence_outputs)
        #logits = self.classifier(sequence_outputs[:, 0, : ].view(-1, 768 ))
        logits = self.classifier(sequence_outputs[:, -1])


        loss = None
        loss = None
        if labels is not None:
            loss_func = nn.CrossEntropyLoss()
            loss = loss_func(logits.view(-1, self.numclasses), labels.view(-1))

            return TokenClassifierOutput(loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions)


class BertClassifier(nn.Module):
 """Bert Model for Classification Tasks."""
 #def __init__(self, checkpoint, num_labels, freeze_bert=False):
 def __init__(self, checkpoint, num_labels):
  super(BertClassifier, self).__init__()
  self.numclasses = num_labels
  self.embeddim = embeddim
  self.numlayers = numlayers
  self.hiddendim_lstm = hiddendim_lstm
  # Specify hidden size of BERT, hidden size of our classifier, and number of labels
  #D_in, H, D_out = 768, 50, 2
  # Instantiate BERT model
  self.model= model = AutoModel.from_pretrained(checkpoint)
  #self.model= model = RobertaForSequenceClassification.from_pretrained(checkpoint)

  #self.dropout = nn.Dropout(0.1)
  #self.activation=nn.ReLU()
  self.lstm = nn.LSTM(self.embeddim, self.hiddendim_lstm, batch_first=True, bidirectional=True)
  #self.dropout1 = nn.Dropout(0.1)
  #self.linear = nn.Linear(self.hiddendim_lstm*2 , self.numclasses)
  self.linear = nn.Linear(self.hiddendim_lstm*2, self.numclasses)
  self.softmax = nn.LogSoftmax(dim=1)

  # Freeze the BERT model
  #if freeze_bert:
   #for param in self.model.parameters():
    #param.requires_grad = False

 def forward(self, input_ids = None, attention_mask=None, labels = None ):
  # Feed input to BERT
  outputs = self.model(input_ids=input_ids,attention_mask=attention_mask)

  sequence_output = outputs[0]
  #print("sequence_output size", sequence_output.size())

  #sequence_output = self.dropout(sequence_output)
  #sequence_output=self.activation(sequence_output)
  sequence_output, _ = self.lstm(sequence_output)

  #sequence_output = self.dropout1(sequence_output)

  #print("lstm size", sequence_output.size())

  #sequence_output = self.dropout1(sequence_output)

  logits = self.linear(sequence_output[:, -1])
  logits = self.softmax(logits)

  loss = None
  loss = None
  if labels is not None:
    loss_func = nn.CrossEntropyLoss()
    loss = loss_func(logits.view(-1, self.numclasses), labels.view(-1))

    return TokenClassifierOutput(loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions)






In [None]:
tokenizer = AutoTokenizer.from_pretrained(checkpoint, max_length=512)
model=BertClassifier(checkpoint,NUM_LABELS)

model.to(device)

In [None]:
SIZE= df_org.shape[0]

train_texts= list(df_org.review[:(9*SIZE)//10])

val_texts=   list(df_org.review[(9*SIZE)//10:(95*SIZE)//100 ])

test_texts=  list(df_org.review[(95*SIZE)//100:])

train_labels= list(df_org.labels[:(9*SIZE)//10])

val_labels=   list(df_org.labels[(9*SIZE)//10:(95*SIZE)//100])

test_labels=  list(df_org.labels[(95*SIZE)//100:])

len(train_texts), len(val_texts), len(test_texts)

In [None]:
train_encodings = tokenizer(train_texts, truncation=True, padding=True)
val_encodings  = tokenizer(val_texts, truncation=True, padding=True)
test_encodings = tokenizer(test_texts, truncation=True, padding=True)

In [None]:
class DataLoader(Dataset):
    """
    Custom Dataset class for handling tokenized text data and corresponding labels.
    Inherits from torch.utils.data.Dataset.
    """
    def __init__(self, encodings, labels):
        """
        Initializes the DataLoader class with encodings and labels.

        Args:
            encodings (dict): A dictionary containing tokenized input text data
                              (e.g., 'input_ids', 'token_type_ids', 'attention_mask').
            labels (list): A list of integer labels for the input text data.
        """
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        """
        Returns a dictionary containing tokenized data and the corresponding label for a given index.

        Args:
            idx (int): The index of the data item to retrieve.

        Returns:
            item (dict): A dictionary containing the tokenized data and the corresponding label.
        """
        # Retrieve tokenized data for the given index
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        # Add the label for the given index to the item dictionary
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        """
        Returns the number of data items in the dataset.

        Returns:
            (int): The number of data items in the dataset.
        """
        return len(self.labels)

In [None]:
print(train_labels)

In [None]:
train_dataloader = DataLoader(train_encodings, train_labels)

val_dataloader = DataLoader(val_encodings, val_labels)

test_dataset = DataLoader(test_encodings, test_labels)

In [None]:
from transformers import TrainingArguments, Trainer

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    """
    Computes accuracy, F1, precision, and recall for a given set of predictions.

    Args:
        pred (obj): An object containing label_ids and predictions attributes.
            - label_ids (array-like): A 1D array of true class labels.
            - predictions (array-like): A 2D array where each row represents
              an observation, and each column represents the probability of
              that observation belonging to a certain class.

    Returns:
        dict: A dictionary containing the following metrics:
            - Accuracy (float): The proportion of correctly classified instances.
            - F1 (float): The macro F1 score, which is the harmonic mean of precision
              and recall. Macro averaging calculates the metric independently for
              each class and then takes the average.
            - Precision (float): The macro precision, which is the number of true
              positives divided by the sum of true positives and false positives.
            - Recall (float): The macro recall, which is the number of true positives
              divided by the sum of true positives and false negatives.
    """
    # Extract true labels from the input object
    labels = pred.label_ids

    # Obtain predicted class labels by finding the column index with the maximum probability
    preds = pred.predictions.argmax(-1)

    # Compute macro precision, recall, and F1 score using sklearn's precision_recall_fscore_support function
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='macro')

    # Calculate the accuracy score using sklearn's accuracy_score function
    acc = accuracy_score(labels, preds)

    # Return the computed metrics as a dictionary
    return {
        'Accuracy': acc,
        'F1': f1,
        'Precision': precision,
        'Recall': recall
    }


In [None]:
training_args = TrainingArguments(
    # The output directory where the model predictions and checkpoints will be written
    output_dir='./SentimentAnaTwitter',
    do_train=True,
    do_eval=True,
    #  The number of epochs, defaults to 3.0
    num_train_epochs=5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    # Number of steps used for a linear warmup
    warmup_steps=100,
    weight_decay=0.01,
    learning_rate=1e-5,
    logging_strategy='steps',
   # TensorBoard log directory
    logging_dir='./multi-class-logs',
    logging_steps=50,
    evaluation_strategy="epoch",
    eval_steps=50,
    save_strategy="epoch",
    fp16=True,
    load_best_model_at_end=True
)

In [None]:
#with training_args.strategy.scope():
# model = TFDistilBertForSequenceClassification.from_pretrained("bert-base-cased")

trainer = Trainer(
    # the pre-trained model that will be fine-tuned
    model=model,
     # training arguments that we defined above
    args=training_args,
    train_dataset=train_dataloader,
    eval_dataset=val_dataloader,
    compute_metrics= compute_metrics
)


In [None]:
trainer.train()

In [None]:
q=[trainer.evaluate(eval_dataset=df_org) for df_org in [train_dataloader, val_dataloader, test_dataset]]

pd.DataFrame(q, index=["train","val","test"]).iloc[:,:5]