<a href="https://colab.research.google.com/github/HereBeCode/StanceDetectionML-Brockport/blob/main/MultiClass_YT_Comment_Sentiment_Classification_BERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install / Import Modules

In [None]:
!pip install datasets

In [None]:
!pip install transformers

In [None]:
import re
from datasets import Dataset
import pandas as pd
from datasets import load_dataset

# Prepare Dataset

In [None]:
filename = "./DataAbortionNLPAugBalanced.csv"
df = pd.read_csv(filename, usecols=['commentTextDisplay','label'], encoding='utf-8')
print(df)

df = df[(df.label == 0) | (df.label == 1) | (df.label == 2)]
df = df.astype({'label': int})
print(df)

In [None]:
def remove_html_and_other(text):
    new_text = re.sub(r'<a href.*\/a>', ' ', text)
    new_text = (new_text.replace('<br /', ' ').
                replace('<b>', ' ').
                replace('</b>', ' ').
                replace('&#39;', "\u0027").
                replace('<br >', ' ').
                replace('&amp;', '&').
                replace('<br>', ' ').
                replace('\u2026', ' ').
                replace('&quot;', '\u0022').
                replace('1st', 'first ').
                replace('2nd', 'second ').
                replace('3rd', 'third ').
                replace('100%', 'one hundred percent ')
    )
    return new_text

def cleanTxt(text):
    TEXT = text.lower()
    TEXT = remove_html_and_other(TEXT)
    return TEXT

In [None]:
df['commentTextDisplay'] = df['commentTextDisplay'].apply(cleanTxt)
print(df)

In [None]:
from sklearn.model_selection import train_test_split
train_df, remaining_df = train_test_split(df, test_size = 0.2, random_state = 1000)
validation_df, test_df = train_test_split(remaining_df, test_size = 0.5, random_state = 1000)
print(train_df)
print(validation_df)
print(test_df)

In [None]:
print(df['label'].value_counts())
print(train_df['label'].value_counts())
print(validation_df['label'].value_counts())
print(test_df['label'].value_counts())

In [None]:
train_dataset = Dataset.from_pandas(train_df)
validation_dataset = Dataset.from_pandas(validation_df)
test_dataset = Dataset.from_pandas(test_df)
print(train_dataset)
print(validation_dataset)
print(test_dataset)

print(train_dataset.features)

# Pre-Processing Dataset
The following steps utilize the tokenizer associated with your pre-trained model to tokenize and prepare the dataset to fit the requirements for tranformer models. That is, the tokenized datasets will contain tensors with the following information: attention mask, input_ids, and label. 

In [None]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-base')

In [None]:
def preprocess_function(examples):
    return tokenizer(examples["commentTextDisplay"], truncation=True)

In [None]:
tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True)
tokenized_validation_dataset = validation_dataset.map(preprocess_function, batched=True)
tokenized_test_dataset = test_dataset.map(preprocess_function, batched=True)

In [None]:
tokenized_train_dataset[0]

In [None]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="tf")

In [None]:
tf_train_set = tokenized_train_dataset.to_tf_dataset(
    columns=["attention_mask", "input_ids", "label"],
    shuffle=True,
    batch_size=4,
    collate_fn=data_collator,
)

tf_validation_set = tokenized_validation_dataset.to_tf_dataset(
    columns=["attention_mask", "input_ids", "label"],
    shuffle=False,
    batch_size=4,
    collate_fn=data_collator
)
tf_test_set = tokenized_test_dataset.to_tf_dataset(
    columns=["attention_mask", "input_ids", "label"],
    shuffle=False,
    batch_size=4,
    collate_fn=data_collator
)

In [None]:
for element in tf_train_set:
  print(element)

# Prepare Model

In [None]:
from transformers import create_optimizer
import tensorflow as tf

batch_size = 4
num_epochs = 3
batches_per_epoch = len(tokenized_train_dataset) // batch_size
total_train_steps = int(batches_per_epoch * num_epochs)
optimizer, schedule = create_optimizer(init_lr=2e-5, num_warmup_steps=0, num_train_steps=total_train_steps)

In [None]:
from transformers import TFAutoModelForSequenceClassification
model = TFAutoModelForSequenceClassification.from_pretrained("drive/MyDrive/Models/TF_Abortion_Stance_Detect_DeBERTa", num_labels=3)
#model = TFAutoModelForSequenceClassification.from_pretrained("microsoft/deberta-base", num_labels=3)

In [None]:
import tensorflow as tf
import sklearn

model.compile(optimizer=optimizer,
              metrics=['accuracy'])

In [None]:
model.summary()

# Fit/Fine-Tune Pre-Trained Model with Dataset

In [None]:
history = model.fit(x=tf_train_set, 
          validation_data=tf_validation_set, 
          epochs=3)

# Predict Sentiment with Single User Input

In [None]:
input = ["abortion should never be allowed except in the case of rape."]

In [None]:
input_df = pd.DataFrame(input, columns=['commentTextDisplay'])
input_dataset = Dataset.from_pandas(input_df)
tokenized_input = input_dataset.map(preprocess_function, batched=True)
input_ds = tokenized_input.to_tf_dataset(
    columns=["attention_mask", "input_ids"],
    batch_size = 1
)

In [None]:
import numpy as np
import scipy
from scipy.special import softmax

prediction = model.predict(input_ds)
probabilities = softmax(prediction.logits)
prediction_label = np.argmax(prediction.logits, axis=1)
print(probabilities)
print(prediction.logits)

In [None]:
def convert_label_to_string(label):
  if (label == 0):
    return "Neutral"
  elif (label == 1):
    return "Positive"
  else:
    return "Negative"

print("Your input: " + input[0] + "\tPrediction: " + convert_label_to_string(prediction_label))

Your input: abortion should never be allowed except in the case of rape.	Prediction: Neutral


# Model Testing And Analysis

In [None]:
model.evaluate(tf_test_set)

In [None]:
predictions = model.predict(tf_test_set)
print(softmax(predictions.logits[0]))
probabilities = softmax(predictions.logits)

In [None]:
probabilities = []
predictions_labels = []
actual_labels = []

for x in range(len(predictions['logits'])):
  probabilities.append(softmax(predictions.logits[x]))
for x in range(len(probabilities)):
  predictions_labels.append(np.argmax(probabilities[x]))
for x in range(len(test_df)):
  actual_labels.append(test_df['label'].iloc[x])

Create and gather confusion matrix data.

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
confusion_matrix = confusion_matrix(actual_labels, predictions_labels)
confusion_matrix_display = ConfusionMatrixDisplay.from_predictions(actual_labels, predictions_labels, display_labels = ['Neutral', 'Pro-Choice', 'Pro-Life'], cmap='Greens')

In [None]:
true_positive_indices = []
true_negative_indices = []
true_neutral_indices = []

positive_predicted_as_neutral = []
positive_predicted_as_negative = []

negative_predicted_as_neutral = []
negative_predicted_as_positive =[]

neutral_predicted_as_positive = []
neutral_predicted_as_negative = []

for x in range(len(test_df)):
  if (test_df['label'].iloc[x] == predictions_labels[x]):
    if (test_df['label'].iloc[x] == 0):
      true_neutral_indices.append(x)
    elif (test_df['label'].iloc[x] == 1):
      true_positive_indices.append(x)
    else:
      true_negative_indices.append(x)
  else:
    if (test_df['label'].iloc[x] == 0 and prediction_labels[x] == 1):
      neutral_predicted_as_positive.append(x)
    elif (test_df['label'].iloc[x] == 0 and prediction_labels[x] == 2):
      neutral_predicted_as_negative.append(x)
    elif (test_df['label'].iloc[x] == 1 and prediction_labels[x] == 0):
      positive_predicted_as_neutral.append(x)
    elif (test_df['label'].iloc[x] == 1 and prediction_labels[x] == 2):
      positive_predicted_as_negative.append(x)
    elif (test_df['label'].iloc[x] == 2 and prediction_labels[x] == 0):
      negative_predicted_as_neutral.append(x)
    elif (test_df['label'].iloc[x] == 2 and prediction_labels[x] == 1):
      negative_predicted_as_positive.append(x)
    else:
      None

print("True positive indices:")
print(true_positive_indices)
print("True negative indices:")
print(true_negative_indices)
print("True neutral indices:")
print(true_neutral_indices)
print("Positive comments predicted as neutral indices: ")
print(positive_predicted_as_neutral)
print("Positive comments predicted as negative indices: ")
print(positive_predicted_as_negative)
print("Negative comments predicted as neutral indices: ")
print(negative_predicted_as_neutral)
print("Negative comments predicted as positive indices: ")
print(negative_predicted_as_positive)
print("Neutral comments predicted as positive indices: ")
print(neutral_predicted_as_positive)
print("Neutral comments predicted as negative indices: ")
print(neutral_predicted_as_negative)


In [None]:
for x in range(len(true_neutral_indices)):
  print("Index " + str(true_neutral_indices[x]) + ": " + test_df['commentTextDisplay'].iloc[true_neutral_indices[x]])

Compute accuracy, precision, recall, and F1 score.

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
accuracy = accuracy_score(actual_labels, predictions_labels)
precision = precision_score(actual_labels, predictions_labels, average = "macro")
recall = recall_score(actual_labels, predictions_labels, average = "macro")
f1_score = f1_score(actual_labels, predictions_labels, average = "macro")

Print dataset and model statistics including breakdown of comments, and model accuracy, precision, recall, and F1 score.

In [None]:
print("Accurary: " + str(accuracy))
print("Precision: " + str(precision))
print("Recall: " + str(recall))
print("F1 Score: " + str(f1_score))

# Save Model

In [None]:
model.save_pretrained('drive/MyDrive/Models/TF_Abortion_Stance_Detect_DeBERTa', save_format='tf')