<a href="https://colab.research.google.com/github/Ayushverma41/Mental-State-Prediction-using-NLP/blob/main/Code/RoBERTa%20%2B%20BiLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==========================================================
# STEP 1: Import Libraries
# ==========================================================
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer, RobertaModel, AdamW, get_linear_schedule_with_warmup
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
import joblib

In [None]:
# ==========================================================
# STEP 2: Load Dataset
# ==========================================================
data_path = "/content/drive/MyDrive/Mental State model/Data/Train_Data.csv"
df = pd.read_csv(data_path)
print("‚úÖ Dataset Loaded. Shape:", df.shape)
print(df.head())

In [None]:
# ==========================================================
# STEP 3: Encode Labels
# ==========================================================
label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['status'])
num_labels = len(label_encoder.classes_)
print("Classes:", label_encoder.classes_)

In [None]:
# ==========================================================
# STEP 4: Train-Test Split
# ==========================================================
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df['statement'].tolist(),
    df['label'].tolist(),
    test_size=0.2,
    random_state=42,
    stratify=df['label']
)

In [None]:
# ==========================================================
# STEP 5: Tokenization
# ==========================================================
tokenizer = RobertaTokenizer.from_pretrained('roberta-base', truncation=True, do_lower_case=True)
MAX_LEN = 128

class MentalHealthDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }

train_dataset = MentalHealthDataset(train_texts, train_labels, tokenizer, MAX_LEN)
val_dataset = MentalHealthDataset(val_texts, val_labels, tokenizer, MAX_LEN)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)


In [None]:
# ==========================================================
# STEP 6: RoBERTa + BiLSTM Model (Custom Dimensions)
# ==========================================================
class RoBERTa_BiLSTM_Custom(nn.Module):
    def __init__(self, output_dim=5, embedding_dim=128, hidden_dim=128, n_layers=2, dropout=0.3):
        super(RoBERTa_BiLSTM_Custom, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.embedding_layer = nn.Linear(768, embedding_dim)
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=dropout,
            bidirectional=True,
            batch_first=True
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        embeddings = self.embedding_layer(sequence_output)
        lstm_out, _ = self.lstm(embeddings)
        avg_pool = torch.mean(lstm_out, dim=1)
        max_pool, _ = torch.max(lstm_out, dim=1)
        pooled = torch.cat((avg_pool, max_pool), dim=1)
        out = self.dropout(pooled)
        logits = self.fc(out)
        return logits

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RoBERTa_BiLSTM_Custom(
    output_dim=num_labels,
    embedding_dim=128,
    hidden_dim=128,
    n_layers=2,
    dropout=0.3
).to(device)

In [None]:
# ==========================================================
# STEP 7: Training Setup (5 Epochs, Overfitting Control)
# ==========================================================
EPOCHS = 5
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_loader) * EPOCHS

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.1 * total_steps),
    num_training_steps=total_steps
)
loss_fn = nn.CrossEntropyLoss()

In [None]:
# ==========================================================
# STEP 8: Training & Evaluation Functions with Progress Bars
# ==========================================================
def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, epoch, total_epochs, max_grad_norm=1.0):
    model.train()
    losses, correct_predictions = [], 0
    progress_bar = tqdm(data_loader, desc=f"Epoch {epoch+1}/{total_epochs} [Training]", leave=False)
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask)
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        scheduler.step()
        correct_predictions += torch.sum(preds == labels)
        losses.append(loss.item())
        progress_bar.set_postfix({
            'Loss': f'{np.mean(losses):.4f}',
            'Acc': f'{(correct_predictions.double() / len(data_loader.dataset)):.4f}'
        })
    return correct_predictions.double() / len(data_loader.dataset), np.mean(losses)

def eval_model(model, data_loader, loss_fn, device, epoch, total_epochs):
    model.eval()
    losses, correct_predictions = [], 0
    all_preds, all_labels = [], []
    progress_bar = tqdm(data_loader, desc=f"Epoch {epoch+1}/{total_epochs} [Validation]", leave=False)
    with torch.no_grad():
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask)
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, labels)
            correct_predictions += torch.sum(preds == labels)
            losses.append(loss.item())
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            progress_bar.set_postfix({
                'Loss': f'{np.mean(losses):.4f}',
                'Acc': f'{(correct_predictions.double() / len(data_loader.dataset)):.4f}'
            })
    acc = correct_predictions.double() / len(data_loader.dataset)
    return acc, np.mean(losses), all_preds, all_labels

In [None]:
# ==========================================================
# STEP 9: Training Loop with Early Stopping
# ==========================================================
best_val_loss = float('inf')
patience = 2
patience_counter = 0
train_losses, val_losses, train_accs, val_accs = [], [], [], []

for epoch in range(EPOCHS):
    print(f"\n===== Epoch {epoch+1}/{EPOCHS} =====")
    train_acc, train_loss = train_epoch(model, train_loader, loss_fn, optimizer, device, scheduler, epoch, EPOCHS)
    val_acc, val_loss, val_preds, val_true = eval_model(model, val_loader, loss_fn, device, epoch, EPOCHS)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc.cpu().item())
    val_accs.append(val_acc.cpu().item())

    print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"Val Loss:   {val_loss:.4f} | Val Acc:   {val_acc:.4f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        os.makedirs("/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/", exist_ok=True)
        torch.save(model.state_dict(), "/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/best_model.pt")
        print("‚úÖ Validation loss improved. Model checkpoint saved.")
    else:
        patience_counter += 1
        print(f"‚ö†Ô∏è No improvement. Early stopping patience: {patience_counter}/{patience}")
        if patience_counter >= patience:
            print("üõë Early stopping triggered ‚Äî training stopped.")
            break


In [None]:
# ==========================================================
# STEP 10: Final Evaluation, F1-score, Confusion Matrix & Plots
# ==========================================================
img_dir = "/content/drive/MyDrive/Mental State model/Images/RoBERTa_BiLSTM/"
os.makedirs(img_dir, exist_ok=True)

model.load_state_dict(torch.load("/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/best_model.pt"))
model.eval()

val_acc, val_loss, val_preds, val_true = eval_model(model, val_loader, loss_fn, device, epoch=0, total_epochs=1)

f1 = f1_score(val_true, val_preds, average='weighted')
print(f"\nüìä Weighted F1-score: {f1:.4f}")
print("\nüìã Classification Report:")
print(classification_report(val_true, val_preds, target_names=label_encoder.classes_))

cm = confusion_matrix(val_true, val_preds)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.title("Confusion Matrix - RoBERTa + BiLSTM")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.tight_layout()
plt.savefig(os.path.join(img_dir, "confusion_matrix.png"))
plt.show()

plt.figure(figsize=(8,6))
plt.plot(range(1, len(train_losses)+1), train_losses, label='Train Loss')
plt.plot(range(1, len(val_losses)+1), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss')
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(img_dir, "loss_curve.png"))
plt.show()

plt.figure(figsize=(8,6))
plt.plot(range(1, len(train_accs)+1), train_accs, label='Train Accuracy')
plt.plot(range(1, len(val_accs)+1), val_accs, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training vs Validation Accuracy')
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(img_dir, "accuracy_curve.png"))
plt.show()

print(f"‚úÖ All plots saved in: {img_dir}")

In [None]:
# ==========================================================
# STEP 11: Save Trained Model and Label Encoder
# ==========================================================
save_dir = "/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/"
os.makedirs(save_dir, exist_ok=True)
torch.save(model.state_dict(), os.path.join(save_dir, "RoBERTa_BiLSTM_model.pt"))
joblib.dump(label_encoder, os.path.join(save_dir, "label_encoder.pkl"))
print(f"‚úÖ Model and Label Encoder saved in: {save_dir}")

**TESTING**

In [None]:
# ==========================================================
# STEP 1: Imports
# ==========================================================
import torch
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer
import joblib
import os
from tqdm.auto import tqdm

# ==========================================================
# STEP 2: Load Model, Tokenizer & Label Encoder
# ==========================================================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_path = "/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/RoBERTa_BiLSTM_model.pt"
encoder_path = "/content/drive/MyDrive/Mental State model/Model/RoBERTa_BiLSTM/label_encoder.pkl"

label_encoder = joblib.load(encoder_path)
num_labels = len(label_encoder.classes_)

tokenizer = RobertaTokenizer.from_pretrained('roberta-base', truncation=True, do_lower_case=True)

# same architecture as before
class RoBERTa_BiLSTM_Custom(torch.nn.Module):
    def __init__(self, output_dim=5, embedding_dim=128, hidden_dim=128, n_layers=2, dropout=0.3):
        super(RoBERTa_BiLSTM_Custom, self).__init__()
        from transformers import RobertaModel
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.embedding_layer = torch.nn.Linear(768, embedding_dim)
        self.lstm = torch.nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            dropout=dropout,
            bidirectional=True,
            batch_first=True
        )
        self.dropout = torch.nn.Dropout(dropout)
        self.fc = torch.nn.Linear(hidden_dim * 2, output_dim)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        embeddings = self.embedding_layer(sequence_output)
        lstm_out, _ = self.lstm(embeddings)
        avg_pool = torch.mean(lstm_out, dim=1)
        max_pool, _ = torch.max(lstm_out, dim=1)
        pooled = torch.cat((avg_pool, max_pool), dim=1)
        out = self.dropout(pooled)
        logits = self.fc(out)
        return logits

# load trained weights
model = RoBERTa_BiLSTM_Custom(output_dim=num_labels)
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()

print("‚úÖ Model and encoder loaded successfully.")

# ==========================================================
# STEP 3: Load Test Data
# ==========================================================
test_path = "/content/drive/MyDrive/Mental State model/Data/Test_Data.csv"
test_df = pd.read_csv(test_path)

print(f"‚úÖ Test data loaded. Shape: {test_df.shape}")
print(test_df.head())

# ==========================================================
# STEP 4: Prepare Test Dataset
# ==========================================================
MAX_LEN = 128

class MentalHealthDataset(Dataset):
    def __init__(self, texts, tokenizer, max_len):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten()
        }

test_dataset = MentalHealthDataset(test_df['statement'].tolist(), tokenizer, MAX_LEN)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# ==========================================================
# STEP 5: Generate Predictions
# ==========================================================
all_preds = []
with torch.no_grad():
    for batch in tqdm(test_loader, desc="Predicting on Test Data"):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        outputs = model(input_ids, attention_mask)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())

# decode labels
predicted_labels = label_encoder.inverse_transform(all_preds)

# ==========================================================
# STEP 6: Save Predictions
# ==========================================================
test_df['Predicted_Status_RoBERTa_BiLSTM'] = predicted_labels

save_path = "/content/drive/MyDrive/Mental State model/Data/RoBERTa_BiLSTM_Predictions.csv"
test_df.to_csv(save_path, index=False)

print(f"‚úÖ Predictions saved to: {save_path}")
print(test_df.head())

# ==========================================================
# STEP 7: Single Sentence Prediction Function
# ==========================================================
def predict_single_sentence(text):
    model.eval()
    encoding = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=128,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)
    with torch.no_grad():
        outputs = model(input_ids, attention_mask)
        pred = torch.argmax(outputs, dim=1).cpu().item()
    return label_encoder.inverse_transform([pred])[0]

# Example:
sample_text = "I feel so low and hopeless these days."
predicted_class = predict_single_sentence(sample_text)
print(f"\nüß† Input: {sample_text}\n‚û°Ô∏è Predicted Mental State: {predicted_class}")


**EVALUATION**

In [None]:
# ==========================================================
# STEP 1: Imports
# ==========================================================
import torch
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, f1_score
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
import os

# ==========================================================
# STEP 2: Define Constants
# ==========================================================
img_dir = "/content/drive/MyDrive/Mental State model/Images/RoBERTa_BiLSTM/"
os.makedirs(img_dir, exist_ok=True)

# ==========================================================
# STEP 3: Load Train & Test Data for Evaluation
# ==========================================================
train_path = "/content/drive/MyDrive/Mental State model/Data/Train_Data.csv"
test_path = "/content/drive/MyDrive/Mental State model/Data/Test_Data.csv"

train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

train_labels = label_encoder.transform(train_df['status'])
test_labels = label_encoder.transform(test_df['status'])

# ==========================================================
# STEP 4: Prepare Datasets
# ==========================================================
class MentalHealthDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }

MAX_LEN = 128
train_dataset_eval = MentalHealthDataset(train_df['statement'], train_labels, tokenizer, MAX_LEN)
test_dataset_eval = MentalHealthDataset(test_df['statement'], test_labels, tokenizer, MAX_LEN)

train_loader_eval = DataLoader(train_dataset_eval, batch_size=16, shuffle=False)
test_loader_eval = DataLoader(test_dataset_eval, batch_size=16, shuffle=False)

loss_fn = nn.CrossEntropyLoss()

# ==========================================================
# STEP 5: Evaluate Function (Acc & Loss)
# ==========================================================
def evaluate_model(model, data_loader, device):
    model.eval()
    losses, preds_all, labels_all = [], [], []
    with torch.no_grad():
        for batch in tqdm(data_loader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)
            preds = torch.argmax(outputs, dim=1)

            losses.append(loss.item())
            preds_all.extend(preds.cpu().numpy())
            labels_all.extend(labels.cpu().numpy())

    acc = accuracy_score(labels_all, preds_all)
    avg_loss = np.mean(losses)
    f1 = f1_score(labels_all, preds_all, average='weighted')
    return acc, avg_loss, f1, labels_all, preds_all

# ==========================================================
# STEP 6: Get Metrics for Train & Test
# ==========================================================
train_acc, train_loss, train_f1, y_train_true, y_train_pred = evaluate_model(model, train_loader_eval, device)
test_acc, test_loss, test_f1, y_test_true, y_test_pred = evaluate_model(model, test_loader_eval, device)

print("\nüìä **Model Performance Summary**")
print(f"Train Accuracy: {train_acc:.4f}, Train Loss: {train_loss:.4f}, F1: {train_f1:.4f}")
print(f"Test  Accuracy: {test_acc:.4f}, Test  Loss: {test_loss:.4f}, F1: {test_f1:.4f}")

# ==========================================================
# STEP 7: Accuracy Comparison Bar Chart
# ==========================================================
plt.figure(figsize=(6, 6))
plt.bar(['Training', 'Testing'], [train_acc, test_acc], color=['skyblue', 'lightgreen'])
plt.ylabel('Accuracy')
plt.title('Training vs Testing Accuracy - RoBERTa + BiLSTM')
for i, acc in enumerate([train_acc, test_acc]):
    plt.text(i, acc + 0.01, f'{acc:.2f}', ha='center', fontsize=12)
plt.tight_layout()
plt.savefig(os.path.join(img_dir, "accuracy_comparison_bar.png"))
plt.show()

# ==========================================================
# STEP 8: Confusion Matrix & Heatmaps
# ==========================================================
def plot_confusion(y_true, y_pred, dataset_type):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Purples',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.title(f'{dataset_type} Confusion Matrix - RoBERTa + BiLSTM')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.tight_layout()
    filename = f"{dataset_type.lower()}_confusion_matrix.png"
    plt.savefig(os.path.join(img_dir, filename))
    plt.show()

plot_confusion(y_train_true, y_train_pred, "Training")
plot_confusion(y_test_true, y_test_pred, "Testing")

print(f"‚úÖ All evaluation plots saved in: {img_dir}")
