In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
import re
import string
from nltk.corpus import stopwords
from nltk.tokenize import TweetTokenizer
from nltk.stem import WordNetLemmatizer
from textblob import TextBlob
from unidecode import unidecode
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.metrics import classification_report
from sklearn.preprocessing import MaxAbsScaler
from sklearn.svm import LinearSVC
from gensim.models import Word2Vec
import contractions

from transformers import BertTokenizerFast, RobertaTokenizerFast, TFRobertaModel, TFBertModel


import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Bidirectional,Embedding, Dropout,BatchNormalization, GlobalMaxPooling1D
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping

from transformers import BertTokenizerFast, RobertaTokenizerFast, TFRobertaModel, TFBertModel,AutoTokenizer, TFAutoModel

In [None]:
data = pd.read_csv(r'../../data/Corona_NLP_train.csv', encoding= 'ISO-8859-1')

In [None]:
data.info()

In [None]:
# filter the data to only conbtain the tweets and the sentiment
data = data[['OriginalTweet', 'Sentiment']]
data.head()

In [None]:
# plot the sentiment distribution using plotly
px.histogram(data, x='Sentiment', title='Sentiment Distribution')

In [None]:
# The data seems balanced, we can now proceed to clean the data

In [None]:
# Sentiment Column Analysis
data['Sentiment'].value_counts()

In [None]:
# convert the sentiments to only three categories|
data['Sentiment'] = data['Sentiment'].map({'Extremely Negative':0,'Negative':0,'Neutral':1,'Positive':2,'Extremely Positive':2})

In [None]:
data['Sentiment'].value_counts()

In [None]:
# Split the data into X and y
X = data['OriginalTweet']
y = data['Sentiment']

In [None]:
# Custom Transformer for Preprocessing and Tokenization
class BertTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, model_name='roberta-base', tokenizer_type='roberta-base', max_len=128):
        """
        Args:
        - model_name: Hugging Face model name (e.g., 'bert-base-uncased', 'roberta-base').
        - tokenizer_type: Tokenizer type to match the model (e.g., 'bert', 'roberta').
        - max_len: Maximum token length for input sequences.
        """
        self.model_name = model_name
        self.tokenizer_type = tokenizer_type
        self.max_len = max_len
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = TFAutoModel.from_pretrained(model_name)
    
    def preprocess_text(self, text):
        text = unidecode(text)  # Normalize Unicode
        text = text.lower()  # Convert to lowercase
        text = re.sub(r'http\S+|www\S+|https\S+', '', text)  # Remove URLs
        text = re.sub(r'@\w+', '', text)  # Remove mentions
        text = re.sub(r'#', ' ', text)  # Replace hashtags with space
        text = text.translate(str.maketrans('', '', string.punctuation))  # Remove punctuation
        text = re.sub(r'\d+', '', text)  # Remove digits
        text = re.sub(r'\s+', ' ', text).strip()  # Remove extra spaces
        return text
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        input_ids = []
        attention_masks = []
        for i in range(len(X)):
            preprocessed_text = self.preprocess_text(X.iloc[i])
            encoded = self.tokenizer.encode_plus(
                preprocessed_text,
                add_special_tokens=True,
                max_length=self.max_len,
                padding='max_length',
                truncation=True,
                return_attention_mask=True
            )
            input_ids.append(encoded['input_ids'])
            attention_masks.append(encoded['attention_mask'])
        return np.array(input_ids), np.array(attention_masks)

In [None]:
# create a function to build the model
def build_bert_model(bert_model, max_len, num_classes):
    input_ids = tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='input_ids')
    attention_masks = tf.keras.layers.Input(shape=(max_len,), dtype=tf.int32, name='attention_masks')
    embeddings = bert_model([input_ids, attention_masks])[1]
    output = tf.keras.layers.Dense(num_classes, activation='softmax')(embeddings)
    model = tf.keras.models.Model(inputs=[input_ids, attention_masks], outputs=output)
    model.compile(tf.optimizers.Adam(learning_rate=1e-5), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# Train, Test Split and Pipeline Integration
def create_pipeline(data, labels, model_name='roberta-base', tokenizer_type='roberta-base', max_len=128, batch_size=32, epochs=4):
    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42)
    
    # Create the transformer
    transformer = BertTransformer(model_name=model_name, tokenizer_type=tokenizer_type, max_len=max_len)
    
    # Transform data
    X_train_ids, X_train_masks = transformer.fit_transform(X_train)
    X_test_ids, X_test_masks = transformer.transform(X_test)
    
    # Build the BERT model
    bert_model = transformer.model
    num_classes = len(np.unique(labels))
    model = build_bert_model(bert_model, max_len, num_classes)
    
    # create a keras callback to stop the training if the model does not improve
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=2,
        restore_best_weights=True
    )
    
    # Train the model
    with tf.device('/GPU:0'):
        history = model.fit(
            [X_train_ids, X_train_masks], y_train,
            validation_data=([X_test_ids, X_test_masks], y_test),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stopping],
        )
    
    # Evaluate the model
    y_pred = np.argmax(model.predict([X_test_ids, X_test_masks]), axis=1)
    report = classification_report(y_test, y_pred)
    print("Classification Report:\n", report)
    return model, transformer, history

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Set memory growth or limit memory allocation
        tf.config.set_logical_device_configuration(
            gpus[0],
            [tf.config.LogicalDeviceConfiguration(memory_limit=4096)]  # Limit to 4GB
        )
    except RuntimeError as e:
        print(e)

In [None]:
model, transformer, history = create_pipeline(X, y, model_name='roberta-base', tokenizer_type='roberta-base', max_len=128, batch_size=2, epochs=10)

In [None]:
# Create a function to evaluate the model
def evaluate_model(model, transformer, X_test, y_test):
    X_test_ids, X_test_masks = transformer.transform(X_test)
    y_pred = np.argmax(model.predict([X_test_ids, X_test_masks]), axis=1)
    report = classification_report(y_test, y_pred)
    print("Classification Report:\n", report)

In [None]:
# import the testing data
test_data = pd.read_csv(r'../../data/Corona_NLP_test.csv', encoding= 'ISO-8859-1')

In [None]:
# filter the data to only conbtain the tweets and the sentiment
test_data = test_data[['OriginalTweet', 'Sentiment']]

In [None]:
# split the data into X and y
X_test = test_data['OriginalTweet']
y_test = test_data['Sentiment']

In [None]:
# convert the sentiments to only three categories|
y_test = y_test.map({'Extremely Negative':0,'Negative':0,'Neutral':1,'Positive':2,'Extremely Positive':2})

In [None]:
evaluate_model(model, transformer, X_test, y_test)