In [None]:
import nltk   

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC

import seaborn as sns
import nltk
from nltk.corpus import treebank
from collections import defaultdict, Counter

# **TASK 1 : POS Tagging on treebank corpus**

**Step 1: Load and preprocess the treebank corpus**

In [None]:
nltk.download('treebank')


tagged_sentences = treebank.tagged_sents()
train_data = []
for sentence in tagged_sentences:
    train_data.extend(sentence)

In [None]:
tags = [tag for _, tag in train_data]
words = [word for word, _ in train_data]

tag_counts = Counter(tags)
word_counts = Counter(words)

In [None]:
unique_tags = list(set(tags))
print(len(unique_tags), unique_tags, sep='\n\n', end='\n\n')

unique_words = list(set(words))
print(len(unique_words), unique_words[:50], sep='\n\n')

 **Step 2: Calculate Transition and Emission Probabilities**

In [None]:
# P(tag_2|tag_1) aka Transition Probability
def transition_probability(tag_2, tag_1):
    count_tag_1 = 0
    count_tag_2_given_tag1 = 0
    tag_bigram_counts = defaultdict(int)

    for index in range(len(tags) - 1):
        if tags[index] == tag_1:
            count_tag_1 += 1
            if tags[index + 1] == tag_2:
                count_tag_2_given_tag1 += 1
    
    total_tags = len(set(tags))  
    # Add-one Laplace smoothing
    probability = (count_tag_2_given_tag1 + 1) / (count_tag_1 + total_tags)
    return probability

# P(word|tag) aka Emission Probability

def emission_probability(word, tag):
    # Extract all words with the given tag
    tag_list = [tagged_word for tagged_word in train_data if tagged_word[1] == tag]
    
    # Count the number of times the specific word appears with the tag
    word_given_tag_list = [tagged_word for tagged_word in tag_list if tagged_word[0] == word]
    
    word_count = len(word_given_tag_list)
    tag_count = len(tag_list)
    
    # Add-one Laplace smoothing
    total_vocabulary = len(set([word_tag[0] for word_tag in train_data]))
    probability = (word_count + 1) / (tag_count + total_vocabulary)
    return  probability


In [None]:
# Visualize an output
print(transition_probability('JJ', 'DT'))

# Visualize an output
print(emission_probability('a', 'DT'))

In [None]:
def viterbi_algorithm(sentence):
    # Initialize matrices
    num_tags = len(unique_tags)
    num_words = len(sentence)
    viterbi_matrix = np.zeros((num_tags, num_words))
    backpointer_matrix = np.zeros((num_tags, num_words), dtype=int)

    # Initialization step
    for i, tag in enumerate(unique_tags):
        emission_prob = emission_probability(sentence[0], tag)
        viterbi_matrix[i, 0] = emission_prob
        backpointer_matrix[i, 0] = 0  # No backpointer needed for the first word

    # Recursion step
    for t in range(1, num_words):
        for i, tag in enumerate(unique_tags):
            max_prob = 0
            max_state = 0
            for j, prev_tag in enumerate(unique_tags):
                trans_prob = transition_probability(tag, prev_tag)
                emiss_prob = emission_probability(sentence[t], tag)
                prob = viterbi_matrix[j, t-1] * trans_prob * emiss_prob
                if prob > max_prob:
                    max_prob = prob
                    max_state = j
            viterbi_matrix[i, t] = max_prob
            backpointer_matrix[i, t] = max_state

    # Termination step
    best_path_prob = np.max(viterbi_matrix[:, num_words-1])
    best_last_tag = np.argmax(viterbi_matrix[:, num_words-1])

    # Backtracking to find the best path
    best_path = [best_last_tag]
    for t in range(num_words-1, 0, -1):
        best_path.insert(0, backpointer_matrix[best_path[0], t])

    # Convert tag indices to tag names
    best_tag_sequence = [unique_tags[i] for i in best_path]

    return best_tag_sequence, best_path_prob

# # Test the Viterbi Algorithm
# test_sentence = ['The', 'dog', 'barked']
# predicted_tags, probability = viterbi_algorithm(test_sentence)

# print(f"Test Sentence: {test_sentence}")
# print(f"Predicted Tags: {predicted_tags}")
# print(f"Probability of the Best Path: {probability:.4f}")

# **TASK 2 : Vanilla Emotion Recognizer**

**Step 1 :  Load and preprocess the corpus**

In [None]:
from datasets import load_dataset
dataset = load_dataset("dair-ai/emotion","split")

In [None]:
train_data = dataset["train"]
validation_data = dataset["validation"]
test_data = dataset["test"]

In [None]:
# Function to preprocess data and extract texts and labels
def preprocess_data(data):
    texts = [item["text"] for item in data]
    labels = [item["label"] for item in data]  # Assuming 'label' is the key for emotions
    return texts, labels

# Preprocess the datasets
train_texts, train_labels = preprocess_data(train_data)
validation_texts, validation_labels = preprocess_data(validation_data)
test_texts, test_labels = preprocess_data(test_data)


**Step 2 : Sentence embeddings**

In [None]:
vectorizer = TfidfVectorizer(max_features=5000)
train_embeddings = vectorizer.fit_transform(train_texts)
validation_embeddings = vectorizer.transform(validation_texts)
test_embeddings = vectorizer.transform(test_texts)

**Step 3.1 : Naive Bayes for emotion recognition**

In [None]:
# Train Naive Bayes Classifier
nb_classifier = MultinomialNB()
nb_classifier.fit(train_embeddings, train_labels)
validation_preds = nb_classifier.predict(validation_embeddings)
test_preds = nb_classifier.predict(test_embeddings)

In [None]:
# Evaluate Naive Bayes Classifier
print("Validation Accuracy (Naive Bayes):", accuracy_score(validation_labels, validation_preds))
print("Validation Classification Report (Naive Bayes):\n", classification_report(validation_labels, validation_preds, zero_division=1))
print("Test Accuracy (Naive Bayes):", accuracy_score(test_labels, test_preds))
print("Test Classification Report (Naive Bayes):\n", classification_report(test_labels, test_preds, zero_division=1))


In [None]:
# Confusion Matrix for Naive Bayes
cm = confusion_matrix(test_labels, test_preds, labels=nb_classifier.classes_)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=nb_classifier.classes_, yticklabels=nb_classifier.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

**Step 3.2 : SVM for emotion recognition**

In [None]:
# Train SVM Classifier with class weights
svm_classifier = SVC(kernel='linear', class_weight='balanced')
svm_classifier.fit(train_embeddings, train_labels)
validation_preds_svm = svm_classifier.predict(validation_embeddings)
test_preds_svm = svm_classifier.predict(test_embeddings)

In [None]:
# Evaluate SVM Classifier with class weights

print("Validation Accuracy (SVM):", accuracy_score(validation_labels, validation_preds_svm))
print("Validation Classification Report (SVM):\n", classification_report(validation_labels, validation_preds_svm, zero_division=1))
print("Test Accuracy (SVM):", accuracy_score(test_labels, test_preds_svm))
print("Test Classification Report (SVM):\n", classification_report(test_labels, test_preds_svm, zero_division=1))

In [None]:
# Generate the confusion matrix
cm_svm = confusion_matrix(test_labels, test_preds_svm, labels=svm_classifier.classes_)

# Visualize the confusion matrix using a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_svm, annot=True, fmt='d', cmap='Blues', xticklabels=svm_classifier.classes_, yticklabels=svm_classifier.classes_)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix for SVM Classifier')
plt.show()

# **Task 3 : Improved Emotion Recognizer**

In [None]:
from sklearn.pipeline import FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import classification_report
from nltk.tokenize import word_tokenize

# Custom transformer to extract POS tags and represent as features
class POSTaggerTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        pos_features = []
        for sentence in X:
            tokens = word_tokenize(sentence)
            pos_tags, _ = viterbi_algorithm(tokens)
            pos_features.append(' '.join(pos_tags))
        return pos_features

# Combine TF-IDF embeddings with POS tag features
class CombinedFeatures(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.tfidf_vectorizer = TfidfVectorizer(max_features=5000)
        self.pos_tagger = POSTaggerTransformer()

    def fit(self, X, y=None):
        pos_tagged_data = self.pos_tagger.transform(X)
        self.tfidf_vectorizer.fit(X + pos_tagged_data)
        return self

    def transform(self, X, y=None):
        tfidf_features = self.tfidf_vectorizer.transform(X).toarray()
        pos_tagged_data = self.pos_tagger.transform(X)
        pos_tfidf_features = self.tfidf_vectorizer.transform(pos_tagged_data).toarray()
        combined_features = np.hstack((tfidf_features, pos_tfidf_features))
        return combined_features

In [None]:
from datasets import load_dataset

# Load the Twitter messages dataset
dataset = load_dataset("dair-ai/emotion","split")

# Extract text and labels
X_train, y_train = dataset['train']['text'], dataset['train']['label']
X_val, y_val = dataset['validation']['text'], dataset['validation']['label']
X_test, y_test = dataset['test']['text'], dataset['test']['label']

# Example usage
combined_features_extractor = CombinedFeatures()
X_train_combined = combined_features_extractor.fit_transform(X_train)
X_val_combined = combined_features_extractor.transform(X_val)
X_test_combined = combined_features_extractor.transform(X_test)

# Train Naive Bayes Classifier
nb_classifier = MultinomialNB()
nb_classifier.fit(X_train_combined, y_train)
nb_val_preds = nb_classifier.predict(X_val_combined)
nb_test_preds = nb_classifier.predict(X_test_combined)

print("Naive Bayes Validation Accuracy:", accuracy_score(y_val, nb_val_preds))
print("Naive Bayes Validation Classification Report:\n", classification_report(y_val, nb_val_preds, zero_division=1))
print("Naive Bayes Test Accuracy:", accuracy_score(y_test, nb_test_preds))
print("Naive Bayes Test Classification Report:\n", classification_report(y_test, nb_test_preds, zero_division=1))

# Train SVM Classifier with class weights
svm_classifier = SVC(kernel='linear', class_weight='balanced')
svm_classifier.fit(X_train_combined, y_train)
svm_val_preds = svm_classifier.predict(X_val_combined)
svm_test_preds = svm_classifier.predict(X_test_combined)

print("SVM Validation Accuracy:", accuracy_score(y_val, svm_val_preds))
print("SVM Validation Classification Report:\n", classification_report(y_val, svm_val_preds, zero_division=1))
print("SVM Test Accuracy:", accuracy_score(y_test, svm_test_preds))
print("SVM Test Classification Report:\n", classification_report(y_test, svm_test_preds, zero_division=1))

# Generate confusion matrix for SVM
cm_svm = confusion_matrix(y_test, svm_test_preds, labels=svm_classifier.classes_)

# Visualize the confusion matrix using a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_svm, annot=True, fmt='d', cmap='Blues', xticklabels=svm_classifier.classes_, yticklabels=svm_classifier.classes_)
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix for SVM Classifier')
plt.show()