# üéØ AEMER - Text Emotion Model Training

**Architecture:** DistilBERT-based classifier
**Dataset:** GoEmotions (Google's Reddit emotion dataset)
**Classes:** angry, happy, sad, neutral (mapped from 27 original emotions)
**Output:** `text_model.pth` for backend integration

---
**Instructions:**
1. Set Runtime ‚Üí Change runtime type ‚Üí **T4 GPU**
2. Run all cells in order
3. Download `text_model.pth` when complete

## 1Ô∏è‚É£ Setup & Install Dependencies

In [None]:
!pip install -q transformers datasets torch scikit-learn matplotlib seaborn

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import DistilBertTokenizer, DistilBertModel
from datasets import load_dataset
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
if device.type == 'cuda':
    print(f'GPU: {torch.cuda.get_device_name(0)}')

## 2Ô∏è‚É£ Load GoEmotions Dataset

In [None]:
# GoEmotions has 27 emotion labels - we'll map to our 4 classes
EMOTION_MAP = {
    # angry
    'anger': 0, 'annoyance': 0, 'disapproval': 0, 'disgust': 0,
    # happy
    'joy': 1, 'amusement': 1, 'approval': 1, 'excitement': 1, 'gratitude': 1,
    'love': 1, 'optimism': 1, 'relief': 1, 'pride': 1, 'admiration': 1,
    # sad
    'sadness': 2, 'disappointment': 2, 'embarrassment': 2, 'grief': 2,
    'remorse': 2, 'fear': 2, 'nervousness': 2,
    # neutral
    'neutral': 3, 'realization': 3, 'surprise': 3, 'confusion': 3,
    'curiosity': 3, 'caring': 3, 'desire': 3
}

LABEL_NAMES = ['angry', 'happy', 'sad', 'neutral']
NUM_CLASSES = 4

print("Loading GoEmotions dataset...")
dataset = load_dataset('google-research-datasets/go_emotions', 'simplified')
print(f"Train: {len(dataset['train'])} samples")
print(f"Val: {len(dataset['validation'])} samples")
print(f"Test: {len(dataset['test'])} samples")

In [None]:
# Get emotion names from dataset
emotion_names = dataset['train'].features['labels'].feature.names
print(f"Original emotions: {emotion_names}")

def map_to_4_class(labels):
    """Map multi-label emotions to single 4-class label."""
    mapped_labels = []
    for label_idx in labels:
        emotion = emotion_names[label_idx]
        if emotion in EMOTION_MAP:
            mapped_labels.append(EMOTION_MAP[emotion])
    
    if not mapped_labels:
        return 3  # neutral if no mapping
    
    # Priority-based selection: angry > sad > happy > neutral
    if 0 in mapped_labels: return 0  # angry
    if 2 in mapped_labels: return 2  # sad
    if 1 in mapped_labels: return 1  # happy
    return 3  # neutral

# Process datasets
def process_split(split):
    texts = []
    labels = []
    for item in split:
        texts.append(item['text'])
        labels.append(map_to_4_class(item['labels']))
    return texts, labels

train_texts, train_labels = process_split(dataset['train'])
val_texts, val_labels = process_split(dataset['validation'])
test_texts, test_labels = process_split(dataset['test'])

print(f"\n\U0001f4ca Class distribution (train):")
for i, name in enumerate(LABEL_NAMES):
    count = train_labels.count(i)
    print(f"  {name}: {count} ({count/len(train_labels)*100:.1f}%)")
print(f"  Total: {len(train_labels)}")

## 3Ô∏è‚É£ Tokenizer & Dataset Class

In [None]:
# Load tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
MAX_LEN = 128

class TextEmotionDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        # Use __call__ method (compatible with all transformers versions)
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

# Create datasets
train_dataset = TextEmotionDataset(train_texts, train_labels, tokenizer, MAX_LEN)
val_dataset = TextEmotionDataset(val_texts, val_labels, tokenizer, MAX_LEN)
test_dataset = TextEmotionDataset(test_texts, test_labels, tokenizer, MAX_LEN)

print(f"‚úÖ Datasets created")
print(f"  Train: {len(train_dataset)}")
print(f"  Val: {len(val_dataset)}")
print(f"  Test: {len(test_dataset)}")

## 4Ô∏è‚É£ Model Architecture

In [None]:
class FocalLoss(nn.Module):
    """
    Focal Loss helps with class imbalance by down-weighting the loss assigned 
    to well-classified examples. It explicitly forces the model to focus on
    hard, misclassified examples.
    """
    def __init__(self, weight=None, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.weight = weight
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none', weight=self.weight)
        pt = torch.exp(-ce_loss)
        focal_loss = ((1 - pt) ** self.gamma) * ce_loss

        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


class TextEmotionClassifier(nn.Module):
    """
    DistilBERT-based text emotion classifier.
    Freezes early layers, uses a deeper classifier head.
    """
    def __init__(self, num_classes=4, dropout=0.3):
        super().__init__()
        self.bert = DistilBertModel.from_pretrained('distilbert-base-uncased')
        
        # Freeze embeddings + first 4 of 6 transformer layers
        for param in self.bert.embeddings.parameters():
            param.requires_grad = False
        for i, layer in enumerate(self.bert.transformer.layer):
            if i < 4:  # Freeze layers 0-3, layers 4-5 are trainable
                for param in layer.parameters():
                    param.requires_grad = False
        
        self.dropout = nn.Dropout(dropout)
        
        # Deeper classifier head to learn complex features
        self.classifier = nn.Sequential(
            nn.Linear(768, 384),
            nn.BatchNorm1d(384),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(384, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]
        x = self.dropout(cls_output)
        return self.classifier(x)

model = TextEmotionClassifier(num_classes=NUM_CLASSES).to(device)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
frozen_params = total_params - trainable_params

print(f"\u2705 Model created")
print(f"  Total parameters: {total_params:,}")
print(f"  Trainable: {trainable_params:,} ({trainable_params/total_params*100:.1f}%)")
print(f"  Frozen: {frozen_params:,} ({frozen_params/total_params*100:.1f}%)")

## 5Ô∏è‚É£ Training Setup

In [None]:
from torch.optim.lr_scheduler import LambdaLR
import numpy as np

# Hyperparameters
BATCH_SIZE = 32
EPOCHS = 10
WEIGHT_DECAY = 0.05
PATIENCE = 3
WARMUP_RATIO = 0.1

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Calculate strong class weights for the imbalanced dataset
class_counts = np.array([train_labels.count(i) for i in range(NUM_CLASSES)])
class_weights = 1.0 / np.log1p(class_counts)  # Log smoothing for weights
class_weights = torch.FloatTensor(class_weights).to(device)
class_weights = class_weights / class_weights.sum() * NUM_CLASSES

# USE FOCAL LOSS: Aggressively penalizes easy examples (Neutral)
criterion = FocalLoss(weight=class_weights, gamma=2.0)

# Differential learning rates
bert_params = [p for n, p in model.named_parameters() if 'bert' in n and p.requires_grad]
classifier_params = [p for n, p in model.named_parameters() if 'bert' not in n and p.requires_grad]

optimizer = torch.optim.AdamW([
    {'params': bert_params, 'lr': 2e-5},
    {'params': classifier_params, 'lr': 1e-4},
], weight_decay=WEIGHT_DECAY)

# Warmup + linear decay
total_steps = len(train_loader) * EPOCHS
warmup_steps = int(total_steps * WARMUP_RATIO)

def lr_lambda(current_step):
    if current_step < warmup_steps:
        return float(current_step) / float(max(1, warmup_steps))
    return max(0.0, float(total_steps - current_step) / float(max(1, total_steps - warmup_steps)))

scheduler = LambdaLR(optimizer, lr_lambda)

print(f"\U0001f4ca Training config:")
print(f"  Batch size: {BATCH_SIZE}")
print(f"  Epochs: {EPOCHS}")
print(f"  BERT LR: 2e-5 | Classifier LR: 1e-4")
print(f"  Loss Function: Focal Loss (gamma=2.0)")
print(f"  Weight decay: {WEIGHT_DECAY}")
print(f"  Warmup steps: {warmup_steps}")
print(f"  Class weights: {class_weights.cpu().numpy().round(2)}")

## 6Ô∏è‚É£ Training Loop

In [None]:
def train_epoch(model, loader, criterion, optimizer, scheduler, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch in tqdm(loader, desc='Training'):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        scheduler.step()  # Step per batch for warmup+decay
        
        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    return total_loss / len(loader), correct / total

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(loader, desc='Evaluating'):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    return total_loss / len(loader), correct / total, all_preds, all_labels

In [None]:
# Training with warmup schedule and early stopping
print("\U0001f680 Starting training...")
print("=" * 60)

history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
best_val_loss = float('inf')  # Track val_loss instead of val_acc
patience_counter = 0

for epoch in range(EPOCHS):
    current_lr = optimizer.param_groups[0]['lr']
    print(f"\nEpoch {epoch+1}/{EPOCHS} | LR: {current_lr:.2e}")
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, scheduler, device)
    val_loss, val_acc, _, _ = evaluate(model, val_loader, criterion, device)
    
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)
    
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.4f}")
    
    # Early stopping based on val_loss (lower is better)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'text_model.pth')
        print(f"  \u2705 Best model saved! (val_loss: {val_loss:.4f}, acc: {val_acc:.4f})")
        patience_counter = 0
    else:
        patience_counter += 1
        print(f"  \u26a0\ufe0f No improvement ({patience_counter}/{PATIENCE})")
        if patience_counter >= PATIENCE:
            print(f"\n\U0001f6d1 Early stopping triggered at epoch {epoch+1}!")
            break

print("\n" + "=" * 60)
print(f"\u2705 Training complete!")
print(f"  Best val loss: {best_val_loss:.4f}")
print(f"  Best val acc: {best_val_acc:.4f}")
print(f"  Total epochs run: {len(history['train_loss'])}")

## 7Ô∏è‚É£ Evaluation

In [None]:
# Load best model
model.load_state_dict(torch.load('text_model.pth'))

# Evaluate on test set
test_loss, test_acc, all_preds, all_labels = evaluate(model, test_loader, criterion, device)

print(f"\nüìä Test Results:")
print(f"  Loss: {test_loss:.4f}")
print(f"  Accuracy: {test_acc:.4f}")
print("\n" + classification_report(all_labels, all_preds, target_names=LABEL_NAMES))

In [None]:
# Plot training curves
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history['train_loss'], label='Train')
axes[0].plot(history['val_loss'], label='Val')
axes[0].set_title('Loss')
axes[0].legend()

axes[1].plot(history['train_acc'], label='Train')
axes[1].plot(history['val_acc'], label='Val')
axes[1].set_title('Accuracy')
axes[1].legend()

plt.tight_layout()
plt.savefig('text_training_curves.png', dpi=150)
plt.show()

In [None]:
# Normalized Confusion Matrix (shows recall per class)
import numpy as np

cm = confusion_matrix(all_labels, all_preds)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]  # Normalize by row

fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Left: Normalized (percentages)
sns.heatmap(cm_normalized, annot=True, fmt='.2%', cmap='Blues',
            xticklabels=LABEL_NAMES, yticklabels=LABEL_NAMES, ax=axes[0],
            vmin=0, vmax=1)
axes[0].set_xlabel('Predicted')
axes[0].set_ylabel('Actual')
axes[0].set_title('Normalized Confusion Matrix (Recall per Class)')

# Right: Raw counts
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=LABEL_NAMES, yticklabels=LABEL_NAMES, ax=axes[1])
axes[1].set_xlabel('Predicted')
axes[1].set_ylabel('Actual')
axes[1].set_title('Confusion Matrix (Raw Counts)')

plt.tight_layout()
plt.savefig('text_confusion_matrix.png', dpi=150)
plt.show()

# Print per-class accuracy
print("\nPer-class accuracy (recall):")
for i, name in enumerate(LABEL_NAMES):
    recall = cm_normalized[i, i]
    print(f"  {name}: {recall:.1%}")
print(f"  Overall: {np.trace(cm)/cm.sum():.1%}")

## 8Ô∏è‚É£ Test with Custom Text

In [None]:
def predict_emotion(text, model, tokenizer, device):
    """Predict emotion from text."""
    model.eval()
    encoding = tokenizer(
        text,
        add_special_tokens=True,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_tensors='pt'
    )
    
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)
    
    with torch.no_grad():
        outputs = model(input_ids, attention_mask)
        probs = torch.softmax(outputs, dim=1)
        pred = torch.argmax(probs, dim=1).item()
        conf = probs[0][pred].item()
    
    return LABEL_NAMES[pred], conf, {LABEL_NAMES[i]: probs[0][i].item() for i in range(4)}

# Test examples
test_texts = [
    "I'm so happy today! Everything is going great!",
    "This is absolutely terrible, I'm furious!",
    "I feel so lonely and depressed...",
    "The meeting is scheduled for 3 PM."
]

print("\nüß™ Testing with sample texts:")
print("=" * 60)
for text in test_texts:
    emotion, conf, probs = predict_emotion(text, model, tokenizer, device)
    print(f"\nText: \"{text[:50]}...\"" if len(text) > 50 else f"\nText: \"{text}\"")
    print(f"  ‚Üí {emotion.upper()} ({conf:.2%})")

## 9Ô∏è‚É£ Save & Download

In [None]:
# Save complete model with metadata
torch.save({
    'model_state_dict': model.state_dict(),
    'num_classes': NUM_CLASSES,
    'label_names': LABEL_NAMES,
    'max_len': MAX_LEN,
    'best_val_acc': best_val_acc,
    'test_acc': test_acc
}, 'text_model_full.pth')

print("‚úÖ Files saved:")
print("  üìÅ text_model.pth - State dict only (for inference)")
print("  üìÅ text_model_full.pth - With metadata")
print("  üìÅ text_training_curves.png")
print("  üìÅ text_confusion_matrix.png")

In [None]:
# Download files (Colab)
try:
    from google.colab import files
    print("üì• Click to download:")
    files.download('text_model.pth')
    files.download('text_model_full.pth')
except:
    print("Files saved - manually download from file browser")

## üéâ Done!

**Next steps:**
1. Download `text_model.pth`
2. Create `TextModel/` folder in your project
3. Put the model file there
4. The backend will integrate it with emotion recognition