# Goal Is to reduce the parameters needed to detect depression so that it could be used on edge devices (Apples autocorrect can detect and help users find mental health sources) rather than needing expensive API requests to a remote LLM.

# Prep work

## Loading and Processing the Data

In [1]:
DATA_PATH = './depression-detection-lt-edi-2022/data/original_dataset/'
MODEL_PATH = './mha_model'

In [2]:
import os

if not os.path.exists(DATA_PATH):
    !git clone https://github.com/rafalposwiata/depression-detection-lt-edi-2022.git
    print("Dataset has been cloned.")
else:
    print("Dataset already exists.")

Dataset already exists.


In [3]:
import pandas as pd

# Storing the data in a panda table
df_train = pd.read_table(DATA_PATH + "train.tsv")
df_dev = pd.read_table(DATA_PATH + "dev.tsv")
df_test = pd.read_table(DATA_PATH + "test.tsv")

# Making the column names standardized
df_train.rename(columns={"PID":"pid", "Text_data":"text_data", "Label":"label"}, inplace=True)
df_dev.rename(columns={"PID":"pid", "Text data":"text_data", "Label":"label"}, inplace=True)
df_test.rename(columns={"Pid":"pid", "text data":"text_data", "Class labels":"label"}, inplace=True)

In [4]:
# Converting from strings to ints for the labels
label_mapping = {
    'not depression': 0,
    'moderate': 1,
    'severe': 2
}
df_train['label'] = df_train['label'].map(label_mapping)
df_dev['label'] = df_dev['label'].map(label_mapping)
df_test['label'] = df_test['label'].map(label_mapping)

## Defining Code for Getting Dataloaders

In [5]:
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer
import torch

class DepressionDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=1024):
        self.texts = dataframe['text_data'].tolist()
        self.labels = dataframe['label'].tolist()
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
def out_of_vocabulary_check(tokenizer, data_loader):

    vocab_size = tokenizer.vocab_size
    
    for batch in tqdm(data_loader):
        input_ids = batch['input_ids'].type(torch.long).to(device)
    
        # Check for out-of-vocabulary tokens before moving to GPU
        out_of_vocab_mask = input_ids >= tokenizer.vocab_size
        if out_of_vocab_mask.any():
            print("Found input IDs out of vocabulary bounds!")
            # Print the offending tokens (if needed)
            print("Offending tokens:", tokenizer.convert_ids_to_tokens(input_ids[out_of_vocab_mask].tolist()))
            # Replace out-of-vocabulary tokens with a special token (e.g., [UNK])
            input_ids[out_of_vocab_mask] = tokenizer.unk_token_id  # Replace with tokenizer.unk_token_id
    
        input_ids = batch['input_ids'].type(torch.long).to(device)
        if (input_ids >= vocab_size).any():
            print("Found input IDs out of vocabulary bounds!")
            print(input_ids[input_ids >= vocab_size])
            break

In [7]:
def get_dataloader(df_data, tokenizer, dataset_type="Train"):
    print(f"{dataset_type} Dataset:")
    dataset = DepressionDataset(df_data, tokenizer)
    data_loader = DataLoader(dataset, batch_size=4, shuffle=True)
    out_of_vocabulary_check(tokenizer, data_loader)
    
    return dataset, data_loader

## Defining the Model

In [8]:
import torch
import torch.nn as nn

class MHA(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads=8, dropout=0.4):
        super(MHA, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.fc_q = nn.Linear(embed_dim, embed_dim)
        self.fc_k = nn.Linear(embed_dim, embed_dim)
        self.fc_v = nn.Linear(embed_dim, embed_dim)
        self.fc_o = nn.Linear(embed_dim, embed_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(embed_dim)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=-1)
        self.classifier = nn.Linear(embed_dim, 3)

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        batch_size = x.size(0)
        q = self.fc_q(x)
        k = self.fc_k(x)
        v = self.fc_v(x)
        q = q.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        k = k.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

        v = v.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        attn_weights = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)
        attn_weights = torch.softmax(attn_weights, dim=-1)
        attn_output = torch.matmul(attn_weights, v)
        attn_output = attn_output.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.embed_dim)
        output = self.fc_o(attn_output)
        output = self.relu(output)
        output = self.dropout(output)
        output = self.layer_norm(output + x)
        output = self.classifier(output)
        output = output[:, 0, :]
        return output

# Running the Code

## Getting Dataloaders

In [9]:
from tqdm import tqdm

In [10]:
my_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dataset, train_data_loader = get_dataloader(df_train, my_tokenizer, "Train")
dev_dataset, dev_data_loader = get_dataloader(df_dev, my_tokenizer, "Dev")
test_dataset, test_data_loader = get_dataloader(df_test, my_tokenizer, "Test")

Train Dataset:


100%|█████████████████████████████████████████████████████████████████████████████| 2223/2223 [00:20<00:00, 108.12it/s]


Dev Dataset:


100%|██████████████████████████████████████████████████████████████████████████████| 1124/1124 [00:13<00:00, 84.65it/s]


Test Dataset:


100%|████████████████████████████████████████████████████████████████████████████████| 812/812 [00:09<00:00, 84.80it/s]


## Loading the Model

In [11]:
# Loading the model
mha_model = MHA(vocab_size=my_tokenizer.vocab_size, embed_dim=768).to(device)
mha_model.load_state_dict(torch.load(MODEL_PATH, map_location=device))

# Setting model to evaluation mode
mha_model.eval();

## Evaluating the Model

In [12]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

total_params = count_parameters(mha_model)
print(f"Total number of trainable parameters: {total_params}")

Total number of trainable parameters: 25807107


In [13]:
def get_accuracy(data_loader, model, dataset_type="Train"):
    # prompt: get the accuracy of the model
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():
        for batch in tqdm(data_loader):
            input_ids = batch['input_ids'].type(torch.long).to(device)
            labels = batch['label'].to(device)
    
            outputs = model(input_ids)
            _, predicted = torch.max(outputs, 1)
    
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()
    
    accuracy = correct_predictions / total_predictions
    print(f"{dataset_type} Accuracy: {accuracy:.4f}")

In [14]:
# get_accuracy(train_data_loader, mha_model, dataset_type="Train")

In [15]:
# get_accuracy(dev_data_loader, mha_model, dataset_type="Dev")

## LIME - Making the model explainable.

In [16]:
!pip install lime



In [17]:
def predict_class(dataset, position, model):  
    input_ids = dataset[position]['input_ids'].type(torch.long).to(device)
    input_ids = torch.stack([input_ids])
    
    output = model(input_ids)
    _, predicted = torch.max(output, 1)

    return predicted.item()

In [18]:
# TODO: Determine if the tokenizer used to produce outputs is appropriate or if it should be changed
# to be more in line with the tokenizer used in the dataset.

def calculate_probabilities(text):   
    outputs = mha_model(my_tokenizer(text, return_tensors="pt", padding=True)['input_ids'])
    probabilities = nn.functional.softmax(outputs, 1).detach().numpy()
    return probabilities

In [19]:
from lime.lime_text import LimeTextExplainer
class_names = ['Not Depressed', 'Moderately', 'Severely']
explainer = LimeTextExplainer(class_names=class_names)

In [20]:
def explain_results(df_set, position, num_features): 
    datapoint = df_set.text_data[position]

    model_prediction = predict_class(dev_dataset, position, mha_model)
    
    exp = explainer.explain_instance(datapoint, calculate_probabilities, 
                                     num_features=num_features, labels=[0, 1, 2], num_samples=200)
    
    print("Position of element in dataset:", position)
    print("Predicted class:", model_prediction)
    print("Actual class:", df_set.label[position])

    print("Explanation for class not depressed:")
    print("\n".join(map(str, exp.as_list(label=0))))
    
    print("Explanation for class moderately depressed:")
    print("\n".join(map(str, exp.as_list(label=1))))
    
    print("Explanation for class severely depressed:")
    print("\n".join(map(str, exp.as_list(label=2))))

    ## Uncomment for a visual of what the probabilities are.
    # exp.show_in_notebook(text=False) 

In [21]:
my_position = 0 # Position of the datapoint that you want explained.
num_features = 4 # Number of features you want the probabilities to be provided for. 

In [22]:
explain_results(df_train, my_position, num_features)

Position of element in dataset: 0
Predicted class: 1
Actual class: 1
Explanation for class not depressed:
('I', -0.09944258355733537)
('my', -0.049294866425432)
('else', -0.044150083378639106)
('worried', 0.040318343638816416)
Explanation for class moderately depressed:
('else', 0.07386112479742181)
('a', -0.051526420286438016)
('worried', -0.04559787298312412)
('there', 0.03776367067300335)
Explanation for class severely depressed:
('I', 0.09957689832325808)
('my', 0.05305879089435377)
('else', -0.025725932151712354)
('anyone', -0.018327971694429453)
