### Import and Device setup

In [1]:
import torch
from torch import nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.backends.cudnn as cudnn

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_fscore_support

import re

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from gensim.models import Word2Vec

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device





### Data Loading

In [None]:
df = pd.read_csv('dataset/combined_emotion.csv')
df.tail(5)



### Data Cleaning

In [36]:
print(df.shape)



In [4]:
df.info()



In [3]:
df = df.dropna()
df = df.drop_duplicates(['sentence','emotion'])

In [39]:
print(df.isnull().sum())
print(df.shape)



#### Distribution of the unbalanced dataset

In [4]:
plt.figure(figsize=(10, 6))
plt.hist(df['emotion'], color='#539caf', edgecolor='black', bins=len(df['emotion'].unique()))
plt.title('Distribution of Emotions', fontsize=16)
plt.xlabel('Emotion', fontsize=14)
plt.ylabel('Frequency', fontsize=14)
plt.xticks(rotation=45)
plt.grid(axis='y', alpha=0.75)
plt.show()



#### Decontraction, Stopwords removal, regex formatting

In [5]:
contractions = {
"ain't": "am not",
"aren't": "are not",
"can't": "cannot",
"can't've": "cannot have",
"'cause": "because",
"could've": "could have",
"couldn't": "could not",
"couldn't've": "could not have",
"didn't": "did not",
"doesn't": "does not",
"don't": "do not",
"hadn't": "had not",
"hadn't've": "had not have",
"hasn't": "has not",
"haven't": "have not",
"he'd": "he would",
"he'd've": "he would have",
"he'll": "he will",
"he's": "he is",
"how'd": "how did",
"how'll": "how will",
"how's": "how is",
"i'd": "i would",
"i'll": "i will",
"i'm": "i am",
"i've": "i have",
"isn't": "is not",
"it'd": "it would",
"it'll": "it will",
"it's": "it is",
"let's": "let us",
"ma'am": "madam",
"mayn't": "may not",
"might've": "might have",
"mightn't": "might not",
"must've": "must have",
"mustn't": "must not",
"needn't": "need not",
"oughtn't": "ought not",
"shan't": "shall not",
"sha'n't": "shall not",
"she'd": "she would",
"she'll": "she will",
"she's": "she is",
"should've": "should have",
"shouldn't": "should not",
"that'd": "that would",
"that's": "that is",
"there'd": "there had",
"there's": "there is",
"they'd": "they would",
"they'll": "they will",
"they're": "they are",
"they've": "they have",
"wasn't": "was not",
"we'd": "we would",
"we'll": "we will",
"we're": "we are",
"we've": "we have",
"weren't": "were not",
"what'll": "what will",
"what're": "what are",
"what's": "what is",
"what've": "what have",
"where'd": "where did",
"where's": "where is",
"who'll": "who will",
"who's": "who is",
"won't": "will not",
"wouldn't": "would not",
"you'd": "you would",
"you'll": "you will",
"you're": "you are",
"ive": "i have",
"dont": "do not",
"doesnt": "does not",
"cant": "cannot",
"whats": "what is",
"shes": "she is",
"hes": "he is",
"theyre": "they are"
}

def decontract_words(text):
    text = text.split()
    new_text = []
    for word in text:
        if word in contractions:
            new_text.append(contractions[word])
        else:
            new_text.append(word)
    return " ".join(new_text)

def format_text_regex(text):

    # ^https?:\/\/(?:www\.)?[-a-zA-Z0-9@:%.\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b(?:[-a-zA-Z0-9()@:%\+.~#?&\/=]*)$

    text = re.sub(r'https?:\/\/.*[\r\n]*', '', text, flags=re.MULTILINE) #clean all URLs
    text = re.sub(r'\<a href', ' ', text) #clean html style URL
    text = re.sub(r'&amp;', '', text) #remove &amp; chars
    text = re.sub(r'[_"\-;%()|+&=*%.,!?:#$@\[\]/]', ' ', text) #remove special characters
    text = re.sub(r'<br />', ' ', text) #remove html style <br>
    text = re.sub(r'\'', ' ', text)
    return text

def remove_stopwords(text):
    text = text.split()
    stops = set(stopwords.words("english"))
    text = [w for w in text if not w in stops]
    return " ".join(text)

# function that groups logic from other preprocessing functions to clean text
def clean_text(text):

    # Convert words to lower case
    text = text.lower()

    # Use other preprocessing functions
    text = decontract_words(text)
    text = format_text_regex(text)

    # Tokenize each word
    text = remove_stopwords(text)
    text =  nltk.WordPunctTokenizer().tokenize(text)

    return text

In [7]:
df['Text_Cleaned'] = list(map(clean_text, df.sentence))
df.head(3)



#### Lemmatization

In [8]:
# function to lemmatize words in text cleaned and create a new column lemmatized text and store them there
def lemmatized_words(text):
    lemm = nltk.stem.WordNetLemmatizer()
    df['lemmatized_text'] = list(map(lambda word:
                                    list(map(lemm.lemmatize, word)),
                                    df.Text_Cleaned))


lemmatized_words(df.Text_Cleaned)
df.head(3)



### Embedding

In [9]:
w2v_model = Word2Vec(df['lemmatized_text'], vector_size=300, window=5, min_count=3)
print(w2v_model)



In [10]:
def text_to_vec(tokens, model):
    vectors = [model.wv[word] for word in tokens if word in model.wv]
    if len(vectors) == 0:
        return np.zeros(300)  # If no known words, return zero vector
    return np.mean(vectors, axis=0)  # Take the mean of word vectors

In [11]:
df['vector'] = df['lemmatized_text'].apply(lambda x: text_to_vec(x, w2v_model))
df.head(5)



In [12]:
print(df['vector'][0].shape)
print(df['vector'][0])



### Label Encoding

In [13]:
df['emotion_label'] = df['emotion'].astype('category').cat.codes
y = df['emotion_label'].values  # we use integer labels
y



In [14]:
df.head()



### Train_Test Split

In [15]:
X_train, X_test, y_train, y_test = train_test_split(
    np.stack(df['vector'].values),
    df['emotion_label'].values,
    test_size=0.2,
    random_state=42
)
print(X_train.shape)
print(y_train.shape)



### Model Building using PyTorch

In [16]:
class TextDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


train_dataset = TextDataset(X_train, y_train)
test_dataset = TextDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True) # shuffle the data and batching it to reduce the amount of data to be loaded into the gpu's memory
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, pin_memory=True)


In [17]:
class SequenceModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, model_type, dropout_rate):
        super(SequenceModel, self).__init__()
        self.model_type = model_type.lower()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout_rate = dropout_rate  # Dropout probability
        
        # Define the recurrent layer
        if self.model_type == "rnn":
            self.rnn = nn.RNN(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout_rate)
        elif self.model_type == "lstm":
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout_rate)
        elif self.model_type == "gru":
            self.gru = nn.GRU(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout_rate)
        else:
            raise ValueError("Invalid model_type. Choose from ['RNN', 'LSTM', 'GRU'].")

        # Dropout before the final fully connected layer
        self.dropout = nn.Dropout(p=dropout_rate)  
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.unsqueeze(1)  # Adding a sequence length dimension (batch_size, seq_len=1, input_size)

        # Pass through RNN/LSTM/GRU
        if self.model_type == "rnn":
            out, _ = self.rnn(x)
        elif self.model_type == "lstm":
            out, _ = self.lstm(x)
        elif self.model_type == "gru":
            out, _ = self.gru(x)

        out = self.dropout(out[:, -1, :])  # Apply dropout before the FC layer
        out = self.fc(out)  # Fully connected layer
        return out


def train_sequence_model(model, train_loader, test_loader, criterion, optimizer, scheduler, epochs):
    model.to(device)
    model.train()
    cudnn.benchmark = True

    train_losses = []
    train_accuracies = []
    test_losses = []
    test_accuracies = []

    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        avg_loss = total_loss / len(train_loader)
        accuracy = correct / total
        train_losses.append(avg_loss)
        train_accuracies.append(accuracy)

        # Evaluate on the test set at the end of the epoch
        test_loss, test_accuracy, _, _ = evaluate_sequence_model(model, test_loader, criterion)
        test_losses.append(test_loss)
        test_accuracies.append(test_accuracy)

        model.train() # Switch back to training mode after evaluation

        scheduler.step()
        current_lr = scheduler.get_last_lr()[0]

        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {avg_loss:.4f}, Train Acc: {accuracy:.4f}, "
                f"Test Loss: {test_loss:.4f}, Test Acc: {test_accuracy:.4f}, LR: {current_lr:.6f}")

    return train_losses, train_accuracies, test_losses, test_accuracies

def evaluate_sequence_model(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    total_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)  # Single forward pass
            loss = criterion(outputs, labels)
            total_loss += loss.item() * inputs.size(0)

            preds = torch.argmax(outputs, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

            # Store results for confusion matrix
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / total
    accuracy = correct / total

    # print(f"Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")
    
    return avg_loss, accuracy, all_preds, all_labels

def plot_training_results_combined(train_losses, train_accuracies, test_losses, test_accuracies, model_name):
    epochs = range(1, len(train_losses) + 1)

    # Create a figure with two subplots
    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    # Loss Plot
    axes[0].plot(epochs, train_losses, label="Train Loss", color="blue", linestyle="-")
    axes[0].plot(epochs, test_losses, label="Test Loss", color="red", linestyle="--")
    axes[0].set_title(f"{model_name} - Loss Over Epochs")
    axes[0].set_xlabel("Epochs")
    axes[0].set_ylabel("Loss")
    axes[0].legend()
    axes[0].grid(True)

    # Accuracy Plot
    axes[1].plot(epochs, train_accuracies, label="Train Accuracy", color="green", linestyle="-")
    axes[1].plot(epochs, test_accuracies, label="Test Accuracy", color="orange", linestyle="--")
    axes[1].set_title(f"{model_name} - Accuracy Over Epochs")
    axes[1].set_xlabel("Epochs")
    axes[1].set_ylabel("Accuracy")
    axes[1].legend()
    axes[1].grid(True)

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(all_preds, all_labels, class_names):
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted Label")
    plt.ylabel("True Label")
    plt.title("Confusion Matrix")
    plt.show()

    # Print classification report for detailed metrics
    print(classification_report(all_labels, all_preds, target_names=class_names))

def get_classification_metrics(all_preds, all_labels, class_names):
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average="weighted")
    
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
def evaluate_and_plot_results(model, test_loader, class_names, criterion, epochs):
    # Evaluate once to get loss, accuracy, predictions, and labels
    test_loss, test_accuracy, all_preds, all_labels = evaluate_sequence_model(model, test_loader, criterion)

    # Expand test loss & accuracy across epochs to match x-axis
    test_losses = [test_loss] * epochs
    test_accuracies = [test_accuracy] * epochs

    # Plot confusion matrix using collected predictions
    plot_confusion_matrix(all_preds, all_labels, class_names)
    get_classification_metrics(all_preds, all_labels, class_names)

    return test_losses, test_accuracies


In [18]:
def save_model(model, optimizer, epoch, file_path): # ="model_checkpoint.pth"
    """
    Save the model state dictionary and optimizer state.
    
    Args:
        model (torch.nn.Module): The trained model.
        optimizer (torch.optim.Optimizer): The optimizer used for training.
        epoch (int): The last epoch the model was trained on.
        file_path (str): Path to save the model file.
    """
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict()
    }
    torch.save(checkpoint, file_path)
    print(f"✅ Model saved successfully at: {file_path}")

def load_model(model, optimizer, file_path):
    """
    Load the model state dictionary and optimizer state.
    
    Args:
        model (torch.nn.Module): The model to load weights into.
        optimizer (torch.optim.Optimizer): The optimizer to load state.
        file_path (str): Path of the saved model file.

    Returns:
        model (torch.nn.Module): Model with loaded weights.
        optimizer (torch.optim.Optimizer): Optimizer with loaded state.
        epoch (int): The epoch at which the model was saved.
    """
    checkpoint = torch.load(file_path, map_location=device)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    epoch = checkpoint["epoch"]

    print(f"✅ Model loaded successfully from: {file_path} (Epoch {epoch})")
    return model, optimizer, epoch


In [19]:
class_names = sorted(df['emotion'].unique())

In [32]:
# Choose the model type: 'rnn', 'lstm', or 'gru'
model_type = "lstm"  # "rnn" / "gru" / "lstm"

# Hyperparameters (adjust as needed)
input_size = 300  # Matches Word2Vec vector size
hidden_size = 128  # Number of neurons in hidden layers
num_layers = 2  # Number of RNN/LSTM/GRU layers
output_size = 6  # Number of emotion classes
learning_rate = 0.001
epochs = 20
dropout_rate = 0.2

# Initialize model
model = SequenceModel(input_size, hidden_size, output_size, num_layers, model_type, dropout_rate=dropout_rate).to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [33]:

# optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

train_losses, train_accuracies, test_losses, test_accuracies = train_sequence_model(
    model, train_loader, test_loader, criterion, optimizer, scheduler, epochs
    )




In [34]:
save_model(model, optimizer, epoch=epochs, file_path="lstm_newest.pth")



In [35]:
plot_training_results_combined(train_losses, train_accuracies, test_losses, test_accuracies, model_name=model_type.upper())
evaluate_and_plot_results(model, test_loader, class_names, criterion, epochs)










#### Load pre-trained model

In [None]:
model, optimizer, last_epoch = load_model(model, optimizer, file_path="lstm_newest.pth")
# plot_training_results_combined(train_losses, train_accuracies, test_losses, test_accuracies, model_name=model_type.upper())
evaluate_and_plot_results(model, test_loader, class_names, criterion, epochs)












In [79]:
validation_data = {
    "sentence": [
        # Joy
        "I am so happy to see you!",
        "This is the best day of my life!",
        "I can't stop smiling right now!",
        
        # Sadness
        "I feel so alone today...",
        "Nothing ever goes right for me.",
        "My heart is heavy with sorrow.",

        # Fear
        "I am scared to be alone at night.",
        "Something doesn't feel right about this.",
        "I have a bad feeling about this situation.",

        # Love
        "I love spending time with you!",
        "You mean the world to me.",
        "Every moment with you is special.",

        # Anger
        "This makes me so mad!",
        "I can't believe they did this to me!",
        "I'm absolutely furious right now!",

        # Surprise
        "Wow, I didn't expect this at all!",
        "I can't believe what just happened!",
        "That was completely unexpected!"
    ]
}

# Convert to DataFrame
new_validation_df = pd.DataFrame(validation_data)

# Save to CSV (Optional)
new_validation_df.to_csv("new_validation_data.csv", index=False)

print(new_validation_df)




In [80]:
# Load new validation data (replace 'new_validation.csv' with your actual file)
new_df = pd.read_csv("new_validation_data.csv")  # Ensure it has a 'sentence' column
print(new_df.head())

# Step 1: Preprocessing - Apply the same text cleaning and vectorization
new_df['Text_Cleaned'] = new_df['sentence'].apply(clean_text)
new_df['lemmatized_text'] = new_df['Text_Cleaned'].apply(lambda x: [nltk.stem.WordNetLemmatizer().lemmatize(word) for word in x])

# Convert text to vectors using Word2Vec (Ensure you're using the trained model)
new_df['vector'] = new_df['lemmatized_text'].apply(lambda x: text_to_vec(x, w2v_model))

# Step 2: Convert to PyTorch Dataset
class ValidationDataset(Dataset):
    def __init__(self, X):
        self.X = torch.tensor(np.stack(X), dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx]

# Prepare validation DataLoader
validation_dataset = ValidationDataset(new_df['vector'].values)
validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False, pin_memory=True)

# Step 3: Evaluate on Validation Set
def evaluate_validation_data(model, validation_loader):
    model.eval()  # Set model to evaluation mode
    predictions = []

    with torch.no_grad():
        for inputs in validation_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)  # Forward pass
            preds = torch.argmax(outputs, dim=1)  # Get class predictions
            predictions.extend(preds.cpu().numpy())

    return predictions

# Run evaluation
predictions = evaluate_validation_data(model, validation_loader)

# Map predictions back to labels
new_df['predicted_label'] = predictions
new_df['predicted_emotion'] = new_df['predicted_label'].map(lambda x: class_names[x])  # Map numerical labels to class names

print(new_df[['sentence', 'predicted_emotion']])




---
---
---
----

To **clear the GPU memory** in your **CMD prompt (Windows)**, use one of the following methods:

---

### **1️⃣ Using `torch.cuda.empty_cache()` (Recommended)**
This **frees up unused GPU memory** inside your Python script (won't restart the CUDA driver).

✅ **Run inside your Python script or Jupyter Notebook:**
```python
import torch
torch.cuda.empty_cache()
```

---

### **2️⃣ Restart Python Kernel (Jupyter Notebook)**
If using **Jupyter Notebook**, restart the kernel to free all memory:
```python
import os
os._exit(00)
```
or simply:
- Click **Kernel** > **Restart Kernel** in Jupyter Notebook.

---

### **3️⃣ Use `nvidia-smi` in CMD (Windows)**
This **kills all GPU processes**, **freeing up memory**.

✅ **Run in CMD Prompt:**
```cmd
nvidia-smi
```
This shows GPU memory usage.

To **clear the GPU memory completely**, **force-kill all CUDA processes**:
```cmd
nvidia-smi --gpu-reset
```
🔴 **⚠ WARNING:** This will **reset the entire GPU driver**, affecting running tasks.

For a **safer approach**, **kill only a specific process (PID)**:
1. Run:
   ```cmd
   nvidia-smi
   ```
   - Find the **Process ID (PID)** of the process consuming memory.
2. Kill the process:
   ```cmd
   taskkill /PID <process_id> /F
   ```
   Example:
   ```cmd
   taskkill /PID 12345 /F
   ```

---

### **4️⃣ Restart the CUDA Driver (Last Resort)**
If all else fails, **restart the NVIDIA driver**.

✅ **Run in CMD Prompt (Admin Mode)**:
```cmd
net stop nvlddmkm
net start nvlddmkm
```
🔴 **⚠ WARNING:** This will **temporarily disable your display** while the driver restarts.

---

### **✅ Summary**
| **Method** | **Effect** | **When to Use?** |
|------------|-----------|------------------|
| `torch.cuda.empty_cache()` | Frees **unused memory** | Use inside Python when running models. |
| Restart Python Kernel | Clears **all memory** | If `empty_cache()` isn't enough. |
| `nvidia-smi --gpu-reset` | Resets **all CUDA processes** | If memory is still occupied. |
| Kill Specific PID (`taskkill`) | Frees memory from **one process** | If you want to free memory selectively. |
| Restart CUDA Driver | **Resets entire GPU** | **Last resort**, may cause flickering. |

🚀 **Try `torch.cuda.empty_cache()` first, and escalate if needed!** Let me know if you need more help! 💡