In [None]:
from google.colab import drive
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, RobertaTokenizer, RobertaForSequenceClassification
from torch.optim import AdamW
from torch.nn import CrossEntropyLoss
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from collections import defaultdict

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

# Load datasets
df = pd.read_csv('/content/drive/MyDrive/TTS/combined_emotion.csv')
df.columns = ['text', 'emotion']

df2 = pd.read_csv('/content/drive/MyDrive/TTS/combined_sentiment_data.csv')
df2.columns = ['text', 'sentiment']

In [None]:
# Preprocessing
print("Missing values:")
print(df.isnull().sum())
df = df.dropna()

print("Missing values:")
print(df2.isnull().sum())
df2 = df2.dropna()

# Plot emotion and sentiment distributions
plt.figure(figsize=(10,5))
sns.countplot(data=df, x='emotion', order=df['emotion'].value_counts().index)
plt.xticks(rotation=45)
plt.title("Emotion Distribution")
plt.show()

plt.figure(figsize=(10,5))
sns.countplot(data=df2, x='sentiment', order=df2['sentiment'].value_counts().index)
plt.xticks(rotation=45)
plt.title("Sentiment Distribution")
plt.show()

In [None]:
# VADER word-level sentiment scoring
nltk.download('vader_lexicon')
sid = SentimentIntensityAnalyzer()

# Define and apply Singlish sentiment lexicon
singlish_lexicon = {
    "shiok": 1.2, "sian": -1.0, "aiyah": -0.3, "aiyo": -0.6, "sabo": -1.2,
    "steady": 0.8, "steady lah": 1.0, "relac": 0.6, "chiong": 0.3, "lepak": 0.7,
    "ang moh": 0.0, "chio": 0.9, "makan": 0.3, "bo jio": -0.6, "paiseh": -0.2,
    "alamak": -0.8, "wah lao": -0.6, "wah piang": -0.6, "lah": 0.1, "leh": 0.1,
    "lor": 0.05, "meh": -0.1, "hor": 0.1, "can lah": 0.5, "no need lah": -0.3,
    "okay lah": 0.5, "don’t play play": 0.3, "jialat": -1.2, "jialat sia": -1.5,
    "heng ah": 0.8, "wah so good": 1.0, "wah so bad": -1.0, "buay tahan": -0.9,
    "shiok sia": 1.0, "win liao lor": 1.0, "confirm plus chop": 0.9, "bo chup": -0.4,
    "act blur": -0.6, "blur like sotong": -0.9, "lim kopi": 0.5, "lim teh": 0.3,
    "catch no ball": -0.3, "don’t anyhow": -0.3, "so stress sia": -1.2,
    "ownself check ownself": -0.5, "talk cock": -0.6, "chio bu": 0.9,
    "ah beng": -0.3, "ah lian": -0.3, "guai kia": 0.6, "kancheong spider": -0.5,
    "spoil market": -0.4, "last warning ah": -0.8
}
sid.lexicon.update(singlish_lexicon)

# Save the Singlish lexicon to CSV for future updating
singlish_df = pd.DataFrame(list(singlish_lexicon.items()), columns=['word', 'avg_sentiment_score'])
singlish_df.to_csv('/content/drive/MyDrive/TTS/singlish_sentiment_lexicon.csv', index=False)

# Containers for scores and word counts
word_scores = defaultdict(float)
word_counts = defaultdict(int)

for text in df2['text']:
    words = text.split()
    for word in words:
        score = sid.polarity_scores(word)['compound']
        word_scores[word] += score
        word_counts[word] += 1

average_word_scores = {word: word_scores[word] / word_counts[word] for word in word_scores}
word_sentiment_df = pd.DataFrame.from_dict(average_word_scores, orient='index', columns=['avg_sentiment_score'])
word_sentiment_df = word_sentiment_df.sort_values(by='avg_sentiment_score', ascending=False)
word_sentiment_df.to_csv('/content/drive/MyDrive/TTS/word_sentiment_scores.csv')

print("Top Positive Words:")
print(word_sentiment_df.head(10))
print("\nTop Negative Words:")
print(word_sentiment_df.tail(10))

In [None]:
def get_sentiment_label(text):
    score = sid.polarity_scores(text)['compound']
    if score >= 0.05:
        return 2  # positive
    elif score <= -0.05:
        return 0  # negative
    else:
        return 1  # neutral

# Encode labels
label_mapping_emotion = {label: idx for idx, label in enumerate(df['emotion'].unique())}
df['emotion'] = df['emotion'].map(label_mapping_emotion)

#label_mapping_sentiment = {label: idx for idx, label in enumerate(df2['sentiment'].unique())}
#df2['sentiment'] = df2['sentiment'].map(label_mapping_sentiment)

df2['sentiment'] = df2['text'].apply(get_sentiment_label)

# Split data
min_len = min(len(df), len(df2))  # Get the minimum size of the two datasets
df = df[:min_len]
df2 = df2[:min_len]

train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['text'].tolist(), df['emotion'].tolist(), test_size=0.2, random_state=42, stratify=df['emotion']
)

val_texts, test_texts, val_labels, test_labels = train_test_split(
    test_texts, test_labels, test_size=0.1, random_state=42, stratify=test_labels
)

train_texts_2, test_texts_2, train_labels_2, test_labels_2 = train_test_split(
    df2['text'].tolist(), df2['sentiment'].tolist(), test_size=0.2, random_state=42, stratify=df2['sentiment']
)

val_texts_2, test_texts_2, val_labels_2, test_labels_2 = train_test_split(
    test_texts_2, test_labels_2, test_size=0.1, random_state=42, stratify=test_labels_2
)

In [None]:
# Tokenizers
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Encode texts
def encode_texts(texts, tokenizer, max_length=128):
    return tokenizer(texts, padding=True, truncation=True, max_length=max_length, return_tensors="pt")

train_encodings_bert = encode_texts(train_texts, bert_tokenizer)
val_encodings_bert = encode_texts(val_texts, bert_tokenizer)
train_encodings_roberta = encode_texts(train_texts_2, roberta_tokenizer)
val_encodings_roberta = encode_texts(val_texts_2, roberta_tokenizer)

In [None]:
# Re-encode using the same tokenizer
train_encodings_roberta2 = encode_texts(train_texts_2, roberta_tokenizer)
val_encodings_roberta2 = encode_texts(val_texts_2, roberta_tokenizer)

In [None]:
# Dataset class
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

In [None]:
# Datasets and loaders
train_dataset_roberta2 = SentimentDataset(train_encodings_roberta2, train_labels_2)
val_dataset_roberta2 = SentimentDataset(val_encodings_roberta2, val_labels_2)

train_loader_roberta2 = DataLoader(train_dataset_roberta2, batch_size=16, shuffle=True)
val_loader_roberta2 = DataLoader(val_dataset_roberta2, batch_size=16, shuffle=False)

In [None]:
roberta_model2 = RobertaForSequenceClassification.from_pretrained(
    'roberta-base',
    num_labels=3
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
roberta_model2.to(device)
optimizer2 = AdamW(roberta_model2.parameters(), lr=2e-5)

In [None]:
def train_roberta_model(model, loader, optimizer, epochs=3):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total = 0
        for batch in tqdm(loader):
            optimizer.zero_grad()

            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = torch.argmax(outputs.logits, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        accuracy = correct / total
        print(f"Epoch {epoch+1}: Loss = {total_loss/len(loader):.4f}, Accuracy = {accuracy:.4f}")

def evaluate_roberta_model(model, loader):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            preds = torch.argmax(outputs.logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    target_names = ["negative", "neutral", "positive"]
    print("Classification Report:")
    print(classification_report(all_labels, all_preds, target_names=target_names))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=target_names, yticklabels=target_names)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Sentiment Classification - Confusion Matrix")
    plt.show()

In [None]:
# Train and evaluate
train_roberta_model(roberta_model2, train_loader_roberta2, optimizer2, epochs=8)
evaluate_roberta_model(roberta_model2, val_loader_roberta2)

In [None]:
# Create multitask dataset
class MultiTaskDataset(Dataset):
    def __init__(self, encodings_bert, labels_emotion, encodings_roberta, labels_sentiment):
        self.encodings_bert = encodings_bert
        self.labels_emotion = labels_emotion
        self.encodings_roberta = encodings_roberta
        self.labels_sentiment = labels_sentiment

    def __len__(self):
        return len(self.labels_emotion)

    def __getitem__(self, idx):
        item = {
            'input_ids_bert': torch.tensor(self.encodings_bert['input_ids'][idx]),
            'attention_mask_bert': torch.tensor(self.encodings_bert['attention_mask'][idx]),
            'labels_emotion': torch.tensor(self.labels_emotion[idx]),
            'input_ids_roberta': torch.tensor(self.encodings_roberta['input_ids'][idx]),
            'attention_mask_roberta': torch.tensor(self.encodings_roberta['attention_mask'][idx]),
            'labels_sentiment': torch.tensor(self.labels_sentiment[idx]),
        }
        return item

In [None]:
# Create DataLoader
train_dataset = MultiTaskDataset(train_encodings_bert, train_labels, train_encodings_roberta, train_labels_2)
val_dataset = MultiTaskDataset(val_encodings_bert, val_labels, val_encodings_roberta, val_labels_2)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Define models
bert_model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=len(label_mapping_emotion))
roberta_model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=3)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

bert_model.to(device)
roberta_model.to(device)

optimizer = AdamW(list(bert_model.parameters()) + list(roberta_model.parameters()), lr=2e-5)
loss_fn = CrossEntropyLoss()

In [None]:
# Training loop
def train_model(bert_model, roberta_model, train_loader, val_loader, epochs=8):
    bert_model.train()
    roberta_model.train()

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}")
        total_loss = 0
        correct_bert = 0
        total_bert = 0
        correct_roberta = 0
        total_roberta = 0

        for batch in tqdm(train_loader):
            optimizer.zero_grad()

            # BERT part
            input_ids_bert = batch['input_ids_bert'].to(device)
            attention_mask_bert = batch['attention_mask_bert'].to(device)
            labels_emotion = batch['labels_emotion'].to(device)
            outputs_bert = bert_model(input_ids_bert, attention_mask=attention_mask_bert, labels=labels_emotion)
            loss_bert = outputs_bert.loss
            total_loss += loss_bert.item()
            loss_bert.backward()

            preds_bert = torch.argmax(outputs_bert.logits, dim=1)
            correct_bert += (preds_bert == labels_emotion).sum().item()
            total_bert += labels_emotion.size(0)

            # RoBERTa part
            input_ids_roberta = batch['input_ids_roberta'].to(device)
            attention_mask_roberta = batch['attention_mask_roberta'].to(device)
            labels_sentiment = batch['labels_sentiment'].to(device)
            outputs_roberta = roberta_model(input_ids_roberta, attention_mask=attention_mask_roberta, labels=labels_sentiment)
            loss_roberta = outputs_roberta.loss
            total_loss += loss_roberta.item()
            loss_roberta.backward()

            preds_roberta = torch.argmax(outputs_roberta.logits, dim=1)
            correct_roberta += (preds_roberta == labels_sentiment).sum().item()
            total_roberta += labels_sentiment.size(0)

            optimizer.step()

        accuracy_bert = correct_bert / total_bert
        accuracy_roberta = correct_roberta / total_roberta
        print(f"Training Loss: {total_loss / len(train_loader)} | BERT Accuracy: {accuracy_bert:.4f} | RoBERTa Accuracy: {accuracy_roberta:.4f}")

# Train the model
train_model(bert_model, roberta_model, train_loader, val_loader)

In [None]:
# Save the models
bert_model.save_pretrained("emotion_bert_model")
roberta_model.save_pretrained("sentiment_roberta_model")
bert_tokenizer.save_pretrained("emotion_bert_model")
roberta_tokenizer.save_pretrained("sentiment_roberta_model")

In [None]:
# Evaluation function
def evaluate_model(bert_model, roberta_model, val_loader):
    bert_model.eval()
    roberta_model.eval()

    all_preds_bert, all_labels_bert = [], []
    all_preds_roberta, all_labels_roberta = [], []

    with torch.no_grad():
        for batch in tqdm(val_loader, desc="Evaluating"):
            # BERT part
            input_ids_bert = batch['input_ids_bert'].to(device)
            attention_mask_bert = batch['attention_mask_bert'].to(device)
            labels_emotion = batch['labels_emotion'].to(device)
            outputs_bert = bert_model(input_ids_bert, attention_mask=attention_mask_bert)
            preds_bert = torch.argmax(outputs_bert.logits, dim=1)
            all_preds_bert.extend(preds_bert.cpu().numpy())
            all_labels_bert.extend(labels_emotion.cpu().numpy())

            # RoBERTa part
            input_ids_roberta = batch['input_ids_roberta'].to(device)
            attention_mask_roberta = batch['attention_mask_roberta'].to(device)
            labels_sentiment = batch['labels_sentiment'].to(device)
            outputs_roberta = roberta_model(input_ids_roberta, attention_mask=attention_mask_roberta)
            preds_roberta = torch.argmax(outputs_roberta.logits, dim=1)
            all_preds_roberta.extend(preds_roberta.cpu().numpy())
            all_labels_roberta.extend(labels_sentiment.cpu().numpy())

    # Emotion labels
    try:
      target_names_emotion = [label for label, idx in sorted(label_mapping_emotion.items(), key=lambda x: x[1])]
      target_names_emotion = [str(label) for label in target_names_emotion]
    except:
      target_names_emotion = [str(label) for label in sorted(set(all_labels_bert))]

    target_names_sentiment = ["negative", "neutral", "positive"]

    print("\n📊 BERT Classification Report (Emotion):")
    print(classification_report(all_labels_bert, all_preds_bert, target_names=target_names_emotion))

    print("\n📊 RoBERTa Classification Report (Sentiment):")
    print(classification_report(all_labels_roberta, all_preds_roberta, target_names=target_names_sentiment))

    # Confusion Matrix for BERT
    cm_bert = confusion_matrix(all_labels_bert, all_preds_bert)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_bert, annot=True, fmt='d', cmap='Blues',
                xticklabels=target_names_emotion, yticklabels=target_names_emotion)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Emotion Classification - Confusion Matrix")
    plt.show()

    # Confusion Matrix for RoBERTa
    cm_roberta = confusion_matrix(all_labels_roberta, all_preds_roberta)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_roberta, annot=True, fmt='d', cmap='Blues',
                xticklabels=target_names_sentiment, yticklabels=target_names_sentiment)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Sentiment Classification - Confusion Matrix")
    plt.show()

# Evaluate the model
evaluate_model(bert_model, roberta_model, val_loader)