<a href="https://colab.research.google.com/github/StephenSheng1101/RS4U_System/blob/main/RS4UModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer, TFBertForSequenceClassification, AdamW
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import tensorflow as tf
from tensorflow.keras.losses import SparseCategoricalCrossentropy
import tf2onnx
import onnx

In [None]:
# Load Yelp dataset (replace 'path_to_yelp_dataset.csv' with the actual path to your Yelp dataset file)
filename = 'yelp_review.csv'

In [None]:
# Read CSV file
df = pd.read_csv(filename, encoding='utf-8', on_bad_lines="skip", engine="python")
# Limit the dataset size to 1000 rows
df = df.head(10000)

# Assuming your dataset has 'stars' as the rating and 'text' as the review text
data = {'text': df['text'].values, 'stars': df['stars'].values}

# Map star ratings to sentiment classes
data['sentiment'] = pd.cut(data['stars'], bins=[0, 2, 3, 5], labels=['negative', 'neutral', 'positive'])

# Convert the dictionary to a Pandas DataFrame
df_data = pd.DataFrame(data)

In [None]:
# Split the dataset into training, validation, and test sets
train_data, test_data = train_test_split(df_data, test_size=0.2, random_state=42)

# If you want to further split for validation, you can do the following
valid_data, test_data = train_test_split(test_data, test_size=0.5, random_state=42)


In [None]:
# BERT tokenizer and model (using bert-base-cased)
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
model = TFBertForSequenceClassification.from_pretrained('bert-base-cased', num_labels=3)  # 3 classes: negative, neutral, positive


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Define a custom dataset
class CustomDataset(tf.keras.utils.Sequence):
    def __init__(self, texts, labels, tokenizer, max_length=128, batch_size=8):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.batch_size = batch_size
        self.label_mapping = {'negative': 0, 'neutral': 1, 'positive': 2}

    #def __len__(self):
        #return len(self.texts) // self.batch_size
    def __len__(self):
        return (len(self.texts) + self.batch_size - 1) // self.batch_size

    def __getitem__(self, idx):
        batch_texts = self.texts[idx * self.batch_size: (idx + 1) * self.batch_size]
        batch_labels = self.labels[idx * self.batch_size: (idx + 1) * self.batch_size]

        # Convert string labels to numerical values
        batch_labels = [self.label_mapping[label] for label in batch_labels]

        # Tokenize the batch of texts
        tokens = self.tokenizer.batch_encode_plus(
            batch_texts,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='tf'
        )

        return {
            'input_ids': tokens['input_ids'],
            'attention_mask': tokens['attention_mask'],
            'label': tf.convert_to_tensor(batch_labels, dtype=tf.int32)
        }


In [None]:
# Tokenize and create DataLoader
def create_dataloader(data, tokenizer, max_length=128, batch_size=8):
    dataset = CustomDataset(texts=data['text'], labels=data['sentiment'], tokenizer=tokenizer, max_length=max_length, batch_size=batch_size)
    dataloader = tf.data.Dataset.from_generator(lambda: dataset, output_signature={
        'input_ids': tf.TensorSpec(shape=(None, max_length), dtype=tf.int32),
        'attention_mask': tf.TensorSpec(shape=(None, max_length), dtype=tf.int32),
        'label': tf.TensorSpec(shape=(None,), dtype=tf.int32)
    })
    return dataloader

In [None]:
max_length = 128
batch_size = 8
train_dataloader = create_dataloader(train_data, tokenizer, max_length=max_length, batch_size=batch_size)
valid_dataloader = create_dataloader(valid_data, tokenizer, max_length=max_length, batch_size=batch_size)
test_dataloader = create_dataloader(test_data, tokenizer, max_length=max_length, batch_size=batch_size)


In [None]:
# Training loop
optimizer = tf.keras.optimizers.Adam(learning_rate=2e-5)
criterion = SparseCategoricalCrossentropy(from_logits=True)




In [None]:
num_epochs = 3
for epoch in range(num_epochs):
    total_loss = 0
    num_batches = tf.data.experimental.cardinality(train_dataloader).numpy()
    #num_batches = len(train_dataloader)


    for batch in train_dataloader:
        inputs = {'input_ids': batch['input_ids'], 'attention_mask': batch['attention_mask']}
        labels = batch['label']

        with tf.GradientTape() as tape:
            outputs = model(inputs, training=True)
            loss = criterion(labels, outputs.logits)

        total_loss += loss.numpy()

        # Backward pass and optimization
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # Calculate average training loss
    avg_train_loss = total_loss / num_batches

    # Validation
    all_preds = []
    all_labels = []
    for batch in valid_dataloader:
        inputs = {'input_ids': batch['input_ids'], 'attention_mask': batch['attention_mask']}
        labels = batch['label']

        outputs = model(inputs, training=False)
        preds = tf.argmax(outputs.logits, axis=1)
        all_preds.extend(preds.numpy())
        all_labels.extend(labels.numpy())

    # Calculate accuracy on validation set
    accuracy_valid = accuracy_score(all_labels, all_preds)

    print(f'Epoch {epoch + 1}/{num_epochs}, Avg Train Loss: {avg_train_loss:.4f}, Validation Accuracy: {accuracy_valid:.4f}')


In [None]:
# Testing on the original model trained using the test set
all_preds_test = []
all_labels_test = []

for batch in test_dataloader:
    inputs = {'input_ids': batch['input_ids'], 'attention_mask': batch['attention_mask']}
    labels = batch['label']

    outputs = model(inputs, training=False)
    preds = tf.argmax(outputs.logits, axis=1)
    all_preds_test.extend(preds.numpy())
    all_labels_test.extend(labels.numpy())

# Calculate accuracy, precision, recall, and f1 score on the test set
accuracy_test = accuracy_score(all_labels_test, all_preds_test)
precision = precision_score(all_labels_test, all_preds_test, average='weighted')
recall = recall_score(all_labels_test, all_preds_test, average='weighted')
f1 = f1_score(all_labels_test, all_preds_test, average='weighted')
print(f'Accuracy (Original Model): {accuracy_test:.4f}')
print(f'Precision (Original Model): {precision:.4f}')
print(f'Recall (Original Model): {recall:.4f}')
print(f'F1 Score (Original Model): {f1:.4f}')

# Confusion matrix on the test set
conf_matrix = confusion_matrix(all_labels_test, all_preds_test)
print('Confusion Matrix (Original Model):')
print('               Predicted Positive Predicted Negative')
print(f'Actual Positive      {conf_matrix[0, 0]}                 {conf_matrix[0, 1]}')
print(f'Actual Negative      {conf_matrix[1, 0]}                 {conf_matrix[1, 1]}')


In [None]:
# Save the trained model
#model.save('C:/FYP/RS4U_Model/tf_model')
model.save_pretrained('colab_tf_model')