<h1 style="text-align:center; font-family:Georgia; font-weight:bold; ">HMM for Emotion Classification</h1>

## Imports

In [None]:
import re
import nltk
import json
import spacy
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
from hmmlearn import hmm
from collections import Counter
import gensim.downloader as api
import matplotlib.pyplot as plt
from gensim.models import Word2Vec
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

np.random.seed(42)

## Load Preprocessed Data

In [None]:
preprocessedData = pd.read_csv('./Preprocessed Data/preprocessed_text.csv')
print(f"Loaded preprocessed data with shape: {preprocessedData.shape}")
print(f"Columns: {preprocessedData.columns.tolist()}")

preprocessedData.head()

## Constants and Global Variables

In [None]:
EMOTIONS = ['happiness', 'neutral', 'sadness', 'anger', 'fear']
MAX_WORDS = 10000  
MAX_SEQUENCE_LENGTH = 100  
EMBEDDING_DIM = 300
NUM_COMPONENTS = 5
RANDOM_STATE = 42

encoder = LabelEncoder()
encoder.classes_ = np.array(EMOTIONS)

if 'label' not in preprocessedData.columns:
    preprocessedData['label'] = encoder.transform(preprocessedData['Emotion'])

emotionCounts = preprocessedData['Emotion'].value_counts()
print("Emotion distribution in dataset:")
print(emotionCounts)

plt.figure(figsize=(10, 6))
sns.barplot(x=emotionCounts.index, y=emotionCounts.values)
plt.title('Emotion Distribution in Dataset')
plt.xlabel('Emotion')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Text Preprocessing

In [None]:
def cleanText(text):
    if not isinstance(text, str):
        return ""
    
    text = text.lower()
    text = re.sub(r'[^a-zA-Z\s]', ' ', text)
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text.split()

print("Tokenizing texts...")
tokenizedTexts = [cleanText(text) for text in tqdm(preprocessedData['Text'])]

seqLengths = [len(tokens) for tokens in tokenizedTexts]
print(f"Average sequence length: {np.mean(seqLengths):.2f}")
print(f"Max sequence length: {np.max(seqLengths)}")
print(f"90th percentile sequence length: {np.percentile(seqLengths, 90):.2f}")

plt.figure(figsize=(10, 6))
plt.hist(seqLengths, bins=50)
plt.title('Distribution of Text Length')
plt.xlabel('Number of Tokens')
plt.ylabel('Count')
plt.axvline(x=MAX_SEQUENCE_LENGTH, color='r', linestyle='--', label=f'Max Length: {MAX_SEQUENCE_LENGTH}')
plt.legend()
plt.tight_layout()
plt.show()

## Word2Vec Embeddings

In [None]:
print("Training Word2Vec model on our dataset...")
w2vModel = Word2Vec(sentences=tokenizedTexts, vector_size=EMBEDDING_DIM, window=5, min_count=1, workers=4)
print(f"Word2Vec model trained. Vocabulary size: {len(w2vModel.wv.key_to_index)}")

vectorizer = CountVectorizer(max_features=MAX_WORDS)
bowMatrix = vectorizer.fit_transform([' '.join(tokens) for tokens in tokenizedTexts])
feature_names = vectorizer.get_feature_names_out()

print(f"Bag of Words matrix shape: {bowMatrix.shape}")

## Text to Sequence Transformation

In [None]:
class TextToSequenceTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, wordVectors, sequenceLength=MAX_SEQUENCE_LENGTH):
        self.wordVectors = wordVectors
        self.sequenceLength = sequenceLength
        self.vectorSize = wordVectors.vector_size

    def fit(self, X, y=None):
        return self
        
    def transform(self, X):
        sequences = np.zeros((len(X), self.sequenceLength, self.vectorSize))
        
        for i, text in enumerate(X):
            tokens = cleanText(text)
            for j, token in enumerate(tokens):
                if j >= self.sequenceLength:
                    break
                    
                if token in self.wordVectors:
                    sequences[i, j] = self.wordVectors[token]
        
        return sequences

def textToSequence(texts, wordVectors, maxLength=MAX_SEQUENCE_LENGTH):
    sequences = np.zeros((len(texts), maxLength, wordVectors.vector_size))
    
    for i, tokens in enumerate(texts):
        for j, token in enumerate(tokens):
            if j >= maxLength:
                break
                
            if token in wordVectors:
                sequences[i, j] = wordVectors[token]
    
    return sequences

textSequences = textToSequence(tokenizedTexts, w2vModel.wv)
print(f"Text sequences shape: {textSequences.shape}")

seqMeans = np.mean(textSequences, axis=1)
print(f"Sequence means shape: {seqMeans.shape}")

## Data Preparation

In [None]:
labels = preprocessedData['label'].values

X_train, X_test, y_train, y_test = train_test_split(
    seqMeans, labels, test_size=0.2, stratify=labels, random_state=RANDOM_STATE
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, stratify=y_train, random_state=RANDOM_STATE
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

print(f"Training set shape: {X_train.shape}")
print(f"Validation set shape: {X_val.shape}")
print(f"Test set shape: {X_test.shape}")

## HMM Model Building

In [None]:
hmmModels = {}

for emotion in EMOTIONS:
    emotionIdx = list(encoder.classes_).index(emotion)
    X_emotion = X_train_scaled[y_train == emotionIdx]
    
    if len(X_emotion) > 0:
        model = hmm.GaussianHMM(
            n_components=NUM_COMPONENTS, 
            covariance_type="full", 
            n_iter=100,
            random_state=RANDOM_STATE
        )
        model.fit(X_emotion)
        hmmModels[emotion] = model
        print(f"Trained HMM for {emotion} on {len(X_emotion)} samples")
    else:
        print(f"No samples for {emotion}")

## Model Evaluation

In [None]:
def predictWithHMM(X, models):
    results = []
    for x in X:
        best_score = -np.inf
        best_label = None
        x = x.reshape(1, -1)
        
        for emotion, model in models.items():
            try:
                score = model.score(x)
                if score > best_score:
                    best_score = score
                    best_label = emotion
            except:
                pass
                
        results.append(best_label if best_label else EMOTIONS[0])
    
    return results

def evaluateModels(X, y_true, models):
    y_pred = predictWithHMM(X, models)
    y_pred_encoded = encoder.transform(y_pred)
    y_true_names = encoder.inverse_transform(y_true)
    
    accuracy = accuracy_score(y_true, y_pred_encoded)
    print(f"Accuracy: {accuracy:.4f}")
    
    print("\nClassification Report:")
    print(classification_report(y_true_names, y_pred, target_names=EMOTIONS))
    
    cm = confusion_matrix(y_true, y_pred_encoded)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=EMOTIONS, yticklabels=EMOTIONS)
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.tight_layout()
    plt.show()
    
    return accuracy, y_pred_encoded

print("Evaluating on validation set...")
val_accuracy, val_preds = evaluateModels(X_val_scaled, y_val, hmmModels)

print("\nEvaluating on test set...")
test_accuracy, test_preds = evaluateModels(X_test_scaled, y_test, hmmModels)

## Lexicon Score Analysis

In [None]:
scoreColumns = [col for col in preprocessedData.columns if col.endswith('Score')]
if scoreColumns:
    testIndices = np.random.choice(range(len(preprocessedData)), size=len(y_test), replace=False)
    testScores = preprocessedData.iloc[testIndices][scoreColumns].values
    
    plt.figure(figsize=(12, 8))
    
    for i, emotion in enumerate(EMOTIONS):
        emotionIndices = np.where(y_test == i)[0]
        if len(emotionIndices) > 0:
            avgScores = np.mean(testScores[emotionIndices], axis=0)
            
            plt.subplot(2, 3, i+1)
            plt.bar(range(len(scoreColumns)), avgScores)
            plt.title(f'Avg Lexicon Scores for {emotion}')
            plt.xticks(range(len(scoreColumns)), [s.replace('Score', '') for s in scoreColumns], rotation=45)
    
    plt.tight_layout()
    plt.show()

## Model for Inference

In [None]:
def predictEmotion(text, models, w2vModel, scaler):
    cleanedText = cleanText(text)
    
    sequenceVectors = np.zeros((len(cleanedText), w2vModel.vector_size))
    for i, token in enumerate(cleanedText):
        if token in w2vModel.wv:
            sequenceVectors[i] = w2vModel.wv[token]
    
    if len(sequenceVectors) > 0:
        meanVector = np.mean(sequenceVectors, axis=0)
        scaledVector = scaler.transform(meanVector.reshape(1, -1))
        
        bestScore = -np.inf
        bestEmotion = None
        
        for emotion, model in models.items():
            try:
                score = model.score(scaledVector)
                if score > bestScore:
                    bestScore = score
                    bestEmotion = emotion
            except:
                pass
        
        return bestEmotion if bestEmotion else EMOTIONS[0]
    else:
        return EMOTIONS[0]

testExamples = [
    "I am so happy today, everything is wonderful!",
    "I feel so sad and depressed after what happened.",
    "I'm absolutely furious about the way they treated me.",
    "I'm really scared about what might happen next.",
    "It's just another normal day, nothing special."
]

for text in testExamples:
    emotion = predictEmotion(text, hmmModels, w2vModel.wv, scaler)
    print(f"\nText: '{text}'")
    print(f"Predicted emotion: {emotion}")

## Model Comparison

In [None]:
hmmMetrics = {
    'accuracy': test_accuracy,
    'model_name': 'HMM with Word2Vec'
}

print(f"HMM with Word2Vec Accuracy: {test_accuracy:.4f}")

## Text Length Analysis

In [None]:
textLengths = [len(text.split()) for text in preprocessedData['Text'].iloc[testIndices]]

bins = [0, 10, 20, 30, 40, 50, 70, 100, 1000]
lengthBins = pd.cut(textLengths, bins=bins)

correctPredictions = (test_preds == y_test)
dfResults = pd.DataFrame({
    'textLength': textLengths,
    'lengthBin': lengthBins,
    'correct': correctPredictions
})

accuracyByLength = dfResults.groupby('lengthBin')['correct'].mean()
samplesByLength = dfResults.groupby('lengthBin').size()

plt.figure(figsize=(12, 6))
accuracyByLength.plot(kind='bar')
plt.title('HMM Model Accuracy by Text Length')
plt.xlabel('Text Length')
plt.ylabel('Accuracy')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

plt.figure(figsize=(12, 6))
samplesByLength.plot(kind='bar')
plt.title('Number of Samples by Text Length')
plt.xlabel('Text Length')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Save Model

In [None]:
import pickle

with open('hmm_emotion_models.pkl', 'wb') as f:
    pickle.dump(hmmModels, f)

w2vModel.save("hmm_word2vec.model")

with open('hmm_scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)

print("Saved HMM models, Word2Vec model, and scaler")

print("\n=== HMM with Word2Vec Model Summary ===")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Number of HMM states: {NUM_COMPONENTS}")
print(f"Embedding dimension: {EMBEDDING_DIM}")
