# Topic: Comparative Analysis of BERT and RoBERTa for Sentiment Classification of COVID-19 Tweets: Optimising Transfer Learning Strategies for Crisis Communication

### This work investigates the comparative performance of fine-tuned BERT and RoBERTa models for sentiment analysis on COVID-19 tweets. It explores the impact of pre-processing techniques, specifically focusing on the removal of URLs, trailing hashtags, and punctuation, to evaluate their influence on model accuracy.


In [None]:
# Imports
import numpy as np
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
import seaborn as sns

# Text Preprocessing
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Data Balancing
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder


# Machine Learning Models (Naive Bayes)
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB

# Transformers (BERT & RoBERTa)
from transformers import BertTokenizerFast, TFBertModel
from transformers import RobertaTokenizerFast, TFRobertaModel

# Deep Learning (Keras)
from tensorflow import keras

# Evaluation Metrics
from sklearn.metrics import accuracy_score, f1_score

# Reproducibility & Visualization Setup
seed = 42

# Set a Seaborn style
sns.set_style("whitegrid")
sns.despine()

# Set a valid Matplotlib style from the available list
plt.style.use("ggplot")  # Replace "ggplot" with any valid style from plt.style.available

# Additional configuration
plt.rc("figure", autolayout=True)
plt.rc("axes", labelweight="bold", labelsize="large", titleweight="bold", titlepad=10)


# Loading the data

In [None]:
df = pd.read_csv('Corona_train.csv',encoding='ISO-8859-1')
df_test = pd.read_csv('Corona_test.csv')

In [None]:
df.head()

In [None]:
#Sentiment Distribution
df.groupby('Sentiment').size().plot(kind='barh', color=sns.palettes.mpl_palette('Dark2'))
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
df['Sentiment'].value_counts()

In [None]:
df.info()

In [None]:
df['TweetAt'] = pd.to_datetime(df['TweetAt'], format='%d-%m-%Y')

# Check for Duplicate tweets

In [None]:
df.drop_duplicates(subset='OriginalTweet',inplace=True)

In [None]:
df.info()

# Tweets Count by Date

In [None]:
tweets_per_day = df['TweetAt'].dt.strftime('%m-%d').value_counts().sort_index().reset_index(name='counts')

In [None]:
plt.figure(figsize=(20, 5))
ax = sns.barplot(
    x='TweetAt',  # Use the correct column name for dates
    y='counts', data=tweets_per_day, edgecolor='black', ci=None, palette='Blues_r'  # Changed errorbar to ci
)
plt.title('Tweets count by date')
plt.yticks([])
ax.bar_label(ax.containers[0])
plt.ylabel('count')
plt.xlabel('')
plt.show()

# Tweets per country and city

In [None]:
tweets_per_country = df['Location'].value_counts().loc[lambda x : x > 100].reset_index(name='counts')

In [None]:
plt.figure(figsize=(15,6))
ax = sns.barplot(x='Location', y='counts', data=tweets_per_country, edgecolor='black', errorbar=('ci', False), palette='Spectral')
plt.title('Tweets count by country')
plt.xticks(rotation=70)  # Rotate x-axis labels for better readability
plt.yticks([])
ax.bar_label(ax.containers[0])
plt.ylabel('count')
plt.xlabel('')
plt.show()

In [None]:
#Distribution of Sentiments Over Time
# Ensure 'TweetAt' is in datetime format
df['TweetAt'] = pd.to_datetime(df['TweetAt'])

# Group by date and sentiment, count occurrences
sentiment_over_time = df.groupby([df['TweetAt'].dt.date, 'Sentiment']).size().unstack(fill_value=0)

# Calculate percentage
sentiment_over_time_pct = sentiment_over_time.div(sentiment_over_time.sum(axis=1), axis=0) * 100

# Plot
plt.figure(figsize=(15, 8))
for column in sentiment_over_time_pct.columns:
    plt.plot(sentiment_over_time_pct.index, sentiment_over_time_pct[column], label=column, linewidth=2)

plt.title('Distribution of Sentiments Over Time', fontsize=16)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Percentage of Tweets', fontsize=12)
plt.legend(title='Sentiment', title_fontsize='13', fontsize='11')
plt.grid(True, linestyle='--', alpha=0.7)

# Rotate and align the tick labels so they look better
plt.gcf().autofmt_xdate()

# Use a serif font for better readability
plt.rcParams['font.family'] = 'serif'

# Add a tight layout to prevent clipping of labels
plt.tight_layout()

# Show the plot
plt.show()

# Data Preprocessing for Sentiment Analysis
To focus on the sentiment analysis task, we will perform targeted data cleaning on the raw tweet text within the provided DataFrame. This process will involve selecting the relevant columns: 'Originaltweet' containing the raw tweets and 'Sentiment' representing the target sentiment labels. This selection streamlines the analysis by focusing on the essential information for the task.

In [None]:
df = df[['OriginalTweet','Sentiment']]

In [None]:
df_test = df_test[['OriginalTweet','Sentiment']]

In [None]:
##CUSTOM DEFINED FUNCTIONS TO CLEAN THE TWEETS

def strip_emoji(text):
    """Removes emojis from a string using a basic regular expression (may not capture all emojis)."""
    # This is a simplified approach, more comprehensive emoji removal might require external libraries
    emojis_pattern = r"[^a-zA-Z0-9\s_]+"
    return re.sub(emojis_pattern, "", text)

#Remove punctuations, links, mentions and \r\n new line characters
def strip_all_entities(text):
    text = text.replace('\r', '').replace('\n', ' ').replace('\n', ' ').lower() #remove \n and \r and lowercase
    text = re.sub(r"(?:\@|https?\://)\S+", "", text) #remove links and mentions
    text = re.sub(r'[^\x00-\x7f]',r'', text) #remove non utf8/ascii characters such as '\x9a\x91\x97\x9a\x97'
    banned_list= string.punctuation + 'Ã'+'±'+'ã'+'¼'+'â'+'»'+'§'
    table = str.maketrans('', '', banned_list)
    text = text.translate(table)
    return text

#clean hashtags at the end of the sentence, and keep those in the middle of the sentence by removing just the # symbol
def clean_hashtags(tweet):
    new_tweet = " ".join(word.strip() for word in re.split('#(?!(?:hashtag)\b)[\w-]+(?=(?:\s+#[\w-]+)*\s*$)', tweet)) #remove last hashtags
    new_tweet2 = " ".join(word.strip() for word in re.split('#|_', new_tweet)) #remove hashtags symbol from words in the middle of the sentence
    return new_tweet2

#Filter special characters such as & and $ present in some words
def filter_chars(a):
    sent = []
    for word in a.split(' '):
        if ('$' in word) | ('&' in word):
            sent.append('')
        else:
            sent.append(word)
    return ' '.join(sent)

def remove_mult_spaces(text): # remove multiple spaces
    return re.sub("\s\s+" , " ", text)

In [None]:
# Define functions for text pre-processing (replace with your actual function definitions)
def remove_mult_spaces(text):
  # Function to remove multiple spaces and replace with a single space
  return ' '.join(text.split())

def filter_chars(text):
  # Function to remove specific characters (adjust as needed)
  return text

def clean_hashtags(text):
  # Function to clean hashtags (adjust as needed)
  return text

def strip_all_entities(text):
  # Function to remove entities like mentions and URLs (adjust as needed)
  text = re.sub(r"(?:\@|https?\://)\S+", "", text) #remove links and mentions
  text = re.sub(r'[^\x00-\x7f]',r'', text) #remove non utf8/ascii characters such as '\x9a\x91\x97\x9a\x97'

  # Import string module in cell 25
  import string
  banned_list= string.punctuation + 'Ã'+'±'+'ã'+'¼'+'â'+'»'+'§'
  table = str.maketrans('', '', banned_list)
  text = text.translate(table)

  return text

def strip_emoji(text):
  # Function to remove emojis (adjust as needed)
  return text

# Process main dataframe
texts_new = []
for t in df.OriginalTweet:
    texts_new.append(remove_mult_spaces(filter_chars(clean_hashtags(strip_all_entities(strip_emoji(t))))))

# Process test dataframe
texts_new_test = []
for t in df_test.OriginalTweet:
    texts_new_test.append(remove_mult_spaces(filter_chars(clean_hashtags(strip_all_entities(strip_emoji(t))))))

# Assign processed text to new columns
df['text_clean'] = texts_new
df_test['text_clean'] = texts_new_test


### Now we can create a new column, for both train and test sets, to host the cleaned version of the tweets' text.

In [None]:
df['text_clean'] = texts_new
df_test['text_clean'] = texts_new_test

In [None]:
df['text_clean'].head()

In [None]:
df_test['text_clean'].head()

In [None]:
df['text_clean'][1:8].values

### To assess the impact of text cleaning on tweet length, we will introduce a new column containing the length of the cleaned text. This will allow us to verify if the cleaning process removes a significant portion of the content or preserves the core message of the tweet.

In [None]:
text_len = []
for text in df.text_clean:
    tweet_len = len(text.split())
    text_len.append(tweet_len)

In [None]:
df['text_len'] = text_len

In [None]:
text_len_test = []
for text in df_test.text_clean:
    tweet_len = len(text.split())
    text_len_test.append(tweet_len)

In [None]:
df_test['text_len'] = text_len_test

In [None]:
plt.figure(figsize=(7,5))
ax = sns.countplot(x='text_len', data=df[df['text_len']<10], palette='mako')
plt.title('Training tweets with less than 10 words')
plt.yticks([])
ax.bar_label(ax.containers[0])
plt.ylabel('count')
plt.xlabel('')
plt.show()

In [None]:
plt.figure(figsize=(7,5))
ax = sns.countplot(x='text_len', data=df_test[df_test['text_len']<10], palette='mako')
plt.title('Test tweets with less than 10 words')
plt.yticks([])
ax.bar_label(ax.containers[0])
plt.ylabel('count')
plt.xlabel('')
plt.show()

In [None]:
sns.distplot(df['text_clean'].str.len())
plt.xlabel('Text Length')
plt.ylabel('Frequency')
plt.title('Distribution of Text Length')
plt.show()


# Data Cleaning Impact on Tweet Length
Our cleaning process has resulted in a significant number of tweets with zero words. This is because some tweets originally consisted solely of mentions (e.g., "@username"), hashtags, and URLs, all of which were removed during cleaning. To ensure the remaining data is suitable for further analysis, we will exclude tweets with zero words and those containing less than five words.

In [None]:
print(f" DF SHAPE: {df.shape}")
print(f" DF TEST SHAPE: {df_test.shape}")

In [None]:
df = df[df['text_len'] > 4]

In [None]:
df_test = df_test[df_test['text_len'] > 4]

In [None]:
print(f" DF SHAPE: {df.shape}")
print(f" DF TEST SHAPE: {df_test.shape}")

# Enhancing Training Data Quality: Tokenizer Version Check

In this step, we perform a more rigorous cleaning of the training data by ensuring compatibility with the tokenizer version used in the model. To achieve this, we begin by importing the BERT tokenizer.

In [None]:
from transformers import BertTokenizerFast
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', force_download=True)

In [None]:
token_lens = []

for txt in df['text_clean'].values:
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens.append(len(tokens))

max_len=np.max(token_lens)

In [None]:
print(f"MAX TOKENIZED SENTENCE LENGTH: {max_len}")

### We will now analyze sentences exceeding 80 tokens after tokenization.

In [None]:
token_lens = []

for i,txt in enumerate(df['text_clean'].values):
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens.append(len(tokens))
    if len(tokens)>80:
        print(f"INDEX: {i}, TEXT: {txt}")

### The pre-processing stage will filter out sentences not identified as English for further analysis.

In [None]:
df['token_lens'] = token_lens

In [None]:
df = df.sort_values(by='token_lens', ascending=False)
df.head(20)

In [None]:
df = df.iloc[12:]
df.head()

### Data cleaning has been completed. To mitigate potential biases, the data will be shuffled, and the index will be reset before proceeding.

In [None]:
df = df.sample(frac=1).reset_index(drop=True)

# Enhanced Test Data Cleaning
### Leveraging tokenization, we will conduct a more comprehensive data cleaning process on the test set.

In [None]:
token_lens_test = []

for txt in df_test['text_clean'].values:
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens_test.append(len(tokens))

max_len=np.max(token_lens_test)

In [None]:
print(f"MAX TOKENIZED SENTENCE LENGTH: {max_len}")

In [None]:
token_lens_test = []

for i,txt in enumerate(df_test['text_clean'].values):
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens_test.append(len(tokens))
    if len(tokens)>80:
        print(f"INDEX: {i}, TEXT: {txt}")

In [None]:
df_test['token_lens'] = token_lens_test

In [None]:
df_test = df_test.sort_values(by='token_lens', ascending=False)
df_test.head(10)

In [None]:
df_test = df_test.iloc[5:]
df_test.head(3)

In [None]:
df_test = df_test.sample(frac=1).reset_index(drop=True)

# Analyzing Sentiment Labels
We now turn our attention to the 'Sentiment' column, which contains the target labels for our sentiment classification task

In [None]:
df['Sentiment'].value_counts()

Our initial step involves encoding the categorical labels with numerical representations. Additionally, we will consolidate the emotional categories into three primary classes: positive, neutral, and negative.


In [None]:
df['Sentiment'] = df['Sentiment'].map({'Extremely Negative':0,'Negative':0,'Neutral':1,'Positive':2,'Extremely Positive':2})

In [None]:
df_test['Sentiment'] = df_test['Sentiment'].map({'Extremely Negative':0,'Negative':0,'Neutral':1,'Positive':2,'Extremely Positive':2})

In [None]:
df['Sentiment'].value_counts()

Our analysis reveals a class imbalance within the data. To mitigate potential bias towards the majority classes, we will employ an oversampling technique on the training and testing sets.

# Random Oversampling Technique for Imbalanced Data

In [None]:
ros = RandomOverSampler()
train_x, train_y = ros.fit_resample(np.array(df['text_clean']).reshape(-1, 1), np.array(df['Sentiment']).reshape(-1, 1));
train_os = pd.DataFrame(list(zip([x[0] for x in train_x], train_y)), columns = ['text_clean', 'Sentiment']);

In [None]:
train_os['Sentiment'].value_counts()

# Train - Validation - Test split

In [None]:
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split
import numpy as np

# Set random seed for reproducibility
seed = 42  # make sure this matches your previous seed value

# Assuming df is your original dataframe
X = df['text_clean'].values.reshape(-1, 1)
y = df['Sentiment'].values

# Split into train+validation and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=seed)

# Oversample the train+validation set
oversampler = RandomOverSampler(random_state=seed)
X_train_val_resampled, y_train_val_resampled = oversampler.fit_resample(X_train_val, y_train_val)

# Split the resampled data into train and validation sets
X_train, X_valid, y_train, y_valid = train_test_split(X_train_val_resampled, y_train_val_resampled,
                                                      test_size=0.1, stratify=y_train_val_resampled,
                                                      random_state=seed)

# Print shapes to verify
print("Train set shape:", X_train.shape)
print("Validation set shape:", X_valid.shape)
print("Test set shape:", X_test.shape)

# One Hot Encoding
Our exploration of different encoding methods for the target variable revealed that one-hot encoding yielded superior accuracy compared to label encoding. Consequently, we will adopt one-hot encoding for this project.

In [None]:
print("Shape of original dataset:", df.shape)
print("Shape of X after splitting:", X_train.shape, X_test.shape)
print("Shape of y after splitting:", y_train.shape, y_test.shape)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.preprocessing import LabelEncoder
from sklearn.naive_bayes import MultinomialNB

# Assume X_train, y_train are already defined

# Print shapes of input data
print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)

# After vectorization
cv = CountVectorizer()

# Reshape X_train to a 1D array of strings
X_train_reshaped = X_train.ravel()  # Flatten the array

X_train_cv = cv.fit_transform(X_train_reshaped) # Pass the 1D array of strings
print("Shape of X_train_cv:", X_train_cv.shape)

# TF-IDF Transformation
tf_transformer = TfidfTransformer(use_idf=True)
X_train_tf = tf_transformer.fit_transform(X_train_cv)  # Use fit_transform instead of just transform
print("Shape of X_train_tf:", X_train_tf.shape)

# Label encoding
le = LabelEncoder()
y_train_le = le.fit_transform(y_train)
print("Shape of y_train_le:", y_train_le.shape)

# Verify shapes match
assert X_train_tf.shape[0] == y_train_le.shape[0], "Mismatch in number of samples between features and labels"

# Now you can fit your model
nb_clf = MultinomialNB()
nb_clf.fit(X_train_tf, y_train_le)
print("Model fitted successfully!")


In [None]:
y_valid_le = le.transform(y_valid)
y_test_le = le.transform(y_test)


In [None]:
print(f"TRAINING DATA: {X_train.shape[0]}\nVALIDATION DATA: {X_valid.shape[0]}\nTESTING DATA: {X_test.shape[0]}" )

# Evaluation Benchmark: Naive Bayes Classifier

To establish a performance benchmark, we begin by implementing a Naive Bayes classifier. This simple yet effective model will serve as a baseline for comparison with the subsequent BERT-based approach. Prior to classification, tweets will be preprocessed using CountVectorizer for tokenization.

In [None]:
from sklearn.feature_extraction.text import TfidfTransformer

# Assuming 'cv' is your CountVectorizer instance from previous cells
X_test_cv = cv.transform(X_test.ravel())  # Vectorize X_test

tf_transformer = TfidfTransformer(use_idf=True).fit(X_train_cv)
X_train_tf = tf_transformer.transform(X_train_cv)
X_test_tf = tf_transformer.transform(X_test_cv)  # Now this should work

Next, we will instantiate the Naive Bayes classifier model

In [None]:
nb_clf = MultinomialNB()

In [None]:
nb_clf.fit(X_train_tf, y_train_le)

In [None]:
nb_pred = nb_clf.predict(X_test_tf)

In [None]:
from sklearn.metrics import classification_report

print('\tClassification Report for Naive Bayes:\n\n',classification_report(y_test_le,nb_pred, target_names=['Negative', 'Neutral', 'Positive']))

# BERT Sentiment Analysis
Having completed the initial sentiment analysis with tokenized sentences, we now proceed to define a custom tokenizer function specifically designed for BERT. Subsequently, we will leverage the encode_plus method of the BERT tokenizer to process the data

In [None]:
from transformers import BertTokenizer, TFBertModel

# Load the pre-trained BERT model (base-uncased) from Hugging Face Transformers
bert_model = TFBertModel.from_pretrained('bert-base-uncased')

# Load the corresponding tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
MAX_LEN=128
def create_model(bert_model, max_len=MAX_LEN):
    """
    Creates a sentiment analysis model using a pre-trained BERT model.

    Args:
        bert_model (transformers.TFModelForSequenceClassification): The pre-trained BERT model to use for sentiment classification.
        max_len (int, optional): The maximum sequence length for input text. Defaults to MAX_LEN (defined elsewhere).

    Returns:
        tf.keras.Model: The compiled sentiment analysis model.
    """

    # Define optimizer, loss function, and accuracy metric
    opt = tf.keras.optimizers.Adam(learning_rate=1e-5)
    loss = tf.keras.losses.CategoricalCrossentropy()
    accuracy = tf.keras.metrics.CategoricalAccuracy()

    # Create input layers for token IDs and attention masks
    input_ids = tf.keras.Input(shape=(max_len,), dtype='int32')
    attention_masks = tf.keras.Input(shape=(max_len,), dtype='int32')

    # Extract token embeddings from the pre-trained BERT model
    # Wrap the BERT model call in a Lambda layer to convert KerasTensors to TensorFlow Tensors
    embeddings = tf.keras.layers.Lambda(lambda x: bert_model(x)[1])([input_ids, attention_masks])

    # Add a Dense layer with softmax activation for multi-class classification (3 classes: Negative, Neutral, Positive)
    output = tf.keras.layers.Dense(3, activation="softmax")(embeddings)

    # Create the Keras model with inputs and outputs
    model = tf.keras.models.Model(inputs=[input_ids, attention_masks], outputs=output)

    # Compile the model with optimizer, loss function, and accuracy metric
    model.compile(opt, loss=loss, metrics=accuracy)

    return model

In [None]:
import tensorflow as tf
model = create_model(bert_model)
model.summary()

In [None]:
print(X_train[:5])

In [None]:
from transformers import BertTokenizer, TFBertForSequenceClassification
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras import regularizers
import tensorflow as tf
import matplotlib.pyplot as plt

# Define constants
max_length = 128
num_classes = 3
batch_size = 16
epochs = 5
checkpoint_path = "training_checkpoints/cp-{epoch:04d}.ckpt"

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the input data
def tokenize_data(texts):
    return tokenizer(
        [t[0] for t in texts.tolist()],  # Extract the single string from each element
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="tf"
    )

# Tokenize your data
X_train_tokenized = tokenize_data(X_train)
X_valid_tokenized = tokenize_data(X_valid)

# Convert labels to categorical if not already done
y_train_encoded = tf.keras.utils.to_categorical(y_train, num_classes=num_classes)
y_valid_encoded = tf.keras.utils.to_categorical(y_valid, num_classes=num_classes)

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((dict(X_train_tokenized), y_train_encoded))
train_dataset = train_dataset.shuffle(len(X_train)).batch(batch_size)

valid_dataset = tf.data.Dataset.from_tensor_slices((dict(X_valid_tokenized), y_valid_encoded))
valid_dataset = valid_dataset.batch(batch_size)

# Load pre-trained BERT model
model = TFBertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_classes)

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5)
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

# Adjust callbacks
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    verbose=1,
    save_freq='epoch'
)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)

# Load the latest checkpoint
latest_checkpoint = tf.train.latest_checkpoint('training_checkpoints')
if latest_checkpoint:
    print(f"Restoring from checkpoint: {latest_checkpoint}")
    model.load_weights(latest_checkpoint)

# Train the model
history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=[checkpoint_callback, early_stopping, lr_scheduler]
)


In [None]:

# Define constants
max_length = 128
num_classes = 3
batch_size = 16
epochs = 10
checkpoint_path = "training_checkpoints/cp-{epoch:0005}.ckpt"

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize the input data
def tokenize_data(texts):
    return tokenizer(
        [t[0] for t in texts.tolist()],  # Extract the single string from each element
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="tf"
    )

# Tokenize your data
X_train_tokenized = tokenize_data(X_train)
X_valid_tokenized = tokenize_data(X_valid)
X_test_tokenized = tokenize_data(X_test)  # Add this for test data

# Convert labels to categorical if not already done
y_train_encoded = tf.keras.utils.to_categorical(y_train, num_classes=num_classes)
y_valid_encoded = tf.keras.utils.to_categorical(y_valid, num_classes=num_classes)
y_test_encoded = tf.keras.utils.to_categorical(y_test, num_classes=num_classes)  # Add this for test data

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((dict(X_train_tokenized), y_train_encoded))
train_dataset = train_dataset.shuffle(len(X_train)).batch(batch_size)

valid_dataset = tf.data.Dataset.from_tensor_slices((dict(X_valid_tokenized), y_valid_encoded))
valid_dataset = valid_dataset.batch(batch_size)

# Load pre-trained BERT model
model = TFBertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_classes)

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5)
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

# Adjust callbacks
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    verbose=1,
    save_freq='epoch'
)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)

# Load the latest checkpoint
latest_checkpoint = tf.train.latest_checkpoint('training_checkpoints')
if latest_checkpoint:
    print(f"Restoring from checkpoint: {latest_checkpoint}")
    model.load_weights(latest_checkpoint)

# Train the model
history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=[checkpoint_callback, early_stopping, lr_scheduler]
)

# Predict using the trained model
result_bert = model.predict(dict(X_test_tokenized))

# Convert predicted probabilities to one-hot encoded predictions
y_pred_probs = tf.nn.softmax(result_bert.logits, axis=-1).numpy()
y_pred_bert = np.zeros_like(y_pred_probs)
y_pred_bert[np.arange(len(y_pred_probs)), y_pred_probs.argmax(1)] = 1

# Define the conf_matrix function
def conf_matrix(y_true, y_pred, title):
    cm = confusion_matrix(y_true.argmax(1), y_pred.argmax(1))
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

# Calculate and display the confusion matrix
conf_matrix(y_test_encoded, y_pred_bert, 'BERT Sentiment Analysis\nConfusion Matrix')


# BERT Results

In [None]:
from sklearn.metrics import classification_report

# Predict using the trained model
result_bert = model.predict(dict(X_test_tokenized))

# Convert predicted probabilities to class labels
y_pred_probs = tf.nn.softmax(result_bert.logits, axis=-1).numpy()
y_pred_labels = np.argmax(y_pred_probs, axis=1)

# Convert true labels to class labels
y_true_labels = np.argmax(y_test_encoded, axis=1)

# Print classification report
print(classification_report(y_true_labels, y_pred_labels, target_names=['Class 0', 'Class 1', 'Class 2']))


# RoBERTa for Sentiment Analysis
Similar to BERT, we first import the RoBERTa tokenizer.

In [None]:
# Load the pre-trained RoBERTa tokenizer
tokenizer_roberta = RobertaTokenizerFast.from_pretrained("roberta-base")


In [None]:
from transformers import RobertaTokenizer, TFRobertaForSequenceClassification
from sklearn.metrics import confusion_matrix

# Define constants
max_length = 128
num_classes = 3
batch_size = 16
epochs = 10
checkpoint_path = "training_checkpoints/cp-{epoch:04d}.ckpt"

# Load RoBERTa tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Function to clean and flatten the data
def preprocess_data(data):
    if isinstance(data, list) and isinstance(data[0], list):
        data = [item[0] for item in data]
    # Remove surrounding brackets and quotes if present
    data = [item.strip("[]'") for item in data]
    return data

# Convert and preprocess your data
X_train = preprocess_data(X_train)
X_valid = preprocess_data(X_valid)
X_test = preprocess_data(X_test)

# Print out the first few elements to check
print("First few elements of X_train:", X_train[:5])
print("First few elements of X_valid:", X_valid[:5])
print("First few elements of X_test:", X_test[:5])

# Tokenize the input data
def tokenize_data(texts):
    return tokenizer(
        texts,
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="tf"
    )

# Tokenize your data
X_train_tokenized = tokenize_data(X_train)
X_valid_tokenized = tokenize_data(X_valid)
X_test_tokenized = tokenize_data(X_test)  # Add this for test data

# Convert labels to categorical if not already done
y_train_encoded = tf.keras.utils.to_categorical(y_train, num_classes=num_classes)
y_valid_encoded = tf.keras.utils.to_categorical(y_valid, num_classes=num_classes)
y_test_encoded = tf.keras.utils.to_categorical(y_test, num_classes=num_classes)  # Add this for test data

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((dict(X_train_tokenized), y_train_encoded))
train_dataset = train_dataset.shuffle(len(X_train)).batch(batch_size)

valid_dataset = tf.data.Dataset.from_tensor_slices((dict(X_valid_tokenized), y_valid_encoded))
valid_dataset = valid_dataset.batch(batch_size)

# Load pre-trained RoBERTa model
model = TFRobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=num_classes)

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5)
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

# Adjust callbacks
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    verbose=1,
    save_freq='epoch'
)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)


# Train the model
history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=[checkpoint_callback, early_stopping, lr_scheduler]
)

# Predict using the trained model
result_roberta = model.predict(dict(X_test_tokenized))

# Convert predicted probabilities to class labels
y_pred_probs = tf.nn.softmax(result_roberta.logits, axis=-1).numpy()
y_pred_labels = np.argmax(y_pred_probs, axis=1)

# Convert true labels to class labels
y_true_labels = np.argmax(y_test_encoded, axis=1)

# Define the conf_matrix function
def conf_matrix(y_true, y_pred, title):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title(title)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.show()

# Calculate and display the confusion matrix
conf_matrix(y_true_labels, y_pred_labels, 'RoBERTa Sentiment Analysis\nConfusion Matrix')


# RoBERTa Modeling

In [None]:
from transformers import RobertaTokenizer, TFRobertaForSequenceClassification


# Define constants
max_length = 128
num_classes = 3

# Load the pre-trained RoBERTa model and tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = TFRobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=num_classes)


# Tokenize test data
def tokenize_data(texts):
    return tokenizer(
        texts,  # Assuming texts is a list of strings
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="tf"
    )

# Tokenize your test data
X_test_tokenized = tokenize_data(X_test)

# Predict using the trained model
def predict(model, X_test_tokenized):
    predictions = model.predict(dict(X_test_tokenized))
    return tf.argmax(predictions.logits, axis=-1).numpy()

y_pred = predict(model, X_test_tokenized)

# Generate classification report
def generate_classification_report(y_true, y_pred):
    return classification_report(y_true, y_pred, target_names=[f'Class {i}' for i in range(num_classes)], digits=4)

# Print classification report
print(generate_classification_report(y_test, y_pred))


In [None]:
from transformers import RobertaTokenizer, TFRobertaForSequenceClassification
from sklearn.metrics import classification_report, confusion_matrix


# Define constants
max_length = 128
num_classes = 3
batch_size = 16
epochs = 10
checkpoint_path = "training_checkpoints/cp-{epoch:04d}.ckpt"

# Load the pre-trained RoBERTa model and tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = TFRobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=num_classes)

# Tokenize the data
def tokenize_data(texts):
    return tokenizer(
        texts,  # Assuming texts is a list of strings
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors="tf"
    )

# Tokenize your training, validation, and test data
X_train_tokenized = tokenize_data(X_train)
X_valid_tokenized = tokenize_data(X_valid)
X_test_tokenized = tokenize_data(X_test)

# Convert labels to categorical if not already done
y_train_encoded = tf.keras.utils.to_categorical(y_train, num_classes=num_classes)
y_valid_encoded = tf.keras.utils.to_categorical(y_valid, num_classes=num_classes)
y_test_encoded = tf.keras.utils.to_categorical(y_test, num_classes=num_classes)

# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((dict(X_train_tokenized), y_train_encoded)).shuffle(len(X_train)).batch(batch_size)
valid_dataset = tf.data.Dataset.from_tensor_slices((dict(X_valid_tokenized), y_valid_encoded)).batch(batch_size)

# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=3e-5)
loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

# Define callbacks
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    verbose=1,
    save_freq='epoch'
)

early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)
lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)

# Load the latest checkpoint if exists
latest_checkpoint = tf.train.latest_checkpoint('training_checkpoints')
if latest_checkpoint:
    print(f"Restoring from checkpoint: {latest_checkpoint}")
    model.load_weights(latest_checkpoint)

# Train the model
history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=epochs,
    callbacks=[checkpoint_callback, early_stopping, lr_scheduler]
)

# Predict using the trained model
def predict(model, X_test_tokenized):
    predictions = model.predict(dict(X_test_tokenized))
    return tf.argmax(predictions.logits, axis=-1).numpy()

y_pred = predict(model, X_test_tokenized)

# Generate classification report
def generate_classification_report(y_true, y_pred):
    return classification_report(y_true, y_pred, target_names=[f'Class {i}' for i in range(num_classes)], digits=4)

# Print classification report
print(generate_classification_report(np.argmax(y_test_encoded, axis=1), y_pred))

# Generate and plot confusion matrix
def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', cbar=False, xticklabels=labels, yticklabels=labels)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix')
    plt.show()

# Plot confusion matrix
plot_confusion_matrix(np.argmax(y_test_encoded, axis=1), y_pred, [f'Class {i}' for i in range(num_classes)])
