# Classification model

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip /content/drive/MyDrive/daic_data/daic_data.zip

## Libraries

In [3]:
import pandas as pd
import os
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from transformers import Trainer, TrainingArguments, AutoTokenizer, AutoModel
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from peft import LoraConfig, get_peft_model

In [4]:
MODEL_NAME = "allenai/longformer-base-4096"
DATA_DIR = "/content/daic_data"

## Data loader

In [5]:
def process_daic_data(data_dir):
  transcripts_dir = os.path.join(data_dir, "transcripts")
  labels_dir = os.path.join(data_dir, "labels")

  df = pd.DataFrame()

  for file in os.listdir(labels_dir):
    if not file.endswith(".csv"):
      continue

    split_name = file.replace(".csv", "")
    split_df = pd.read_csv(os.path.join(labels_dir, file))
    split_df = split_df.rename(columns={
      "PHQ_Binary": "depression_label",
      "PHQ_Score": "depression_severity",
      "PHQ8_Binary": "depression_label",
      "PHQ8_Score": "depression_severity",
      "Participant_ID": "participant_id",
    })

    transcripts_df = create_dataframe(split_df, transcripts_dir)
    transcripts_df["split"] = split_name

    df = pd.concat([df, transcripts_df], ignore_index=True)

  return df

def create_dataframe(split_df, transcripts_dir):
  df = {"text": [], "depression_label": []}

  for _, row in split_df.iterrows():
    participant_id = str(int(float(row.participant_id)))
    depression_label = int(row.depression_label)

    participant_text = ""
    transcript_file = os.path.join(transcripts_dir, f"{participant_id}_TRANSCRIPT.csv")
    if not os.path.exists(transcript_file):
      print(f"Transcript file not found for participant {participant_id}")
      continue

    transcripts = pd.read_csv(transcript_file, sep="\t")
    participant_transcripts = transcripts[transcripts['speaker'] == 'Participant']

    for _, transcript_row in participant_transcripts.iterrows():
      participant_text += str(transcript_row.value) + " "

    df["text"].append(participant_text.strip())
    df["depression_label"].append(depression_label)

  return pd.DataFrame(df)


## Train classification model

In [None]:
class TranscriptsDataset(Dataset):
  def __init__(self, dataframe, tokenizer, max_length=4096):
    self.data = dataframe
    self.tokenizer = tokenizer
    self.max_length = max_length

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    text = str(self.data.iloc[idx]["text"])
    label = int(self.data.iloc[idx]["depression_label"])

    encoding = self.tokenizer(
      text,
      truncation=True,
      padding="max_length",
      max_length=self.max_length,
      return_tensors="pt"
    )

    return {
      "input_ids": encoding["input_ids"].squeeze(),
      "attention_mask": encoding["attention_mask"].squeeze(),
      "labels": torch.tensor(label, dtype=torch.long),
    }

class TextFeaturizer(nn.Module):
  def __init__(self, model_name, dropout=0.5, dense_size=256,
               lora_r=8, lora_alpha=16, lora_dropout=0.1):
    super().__init__()

    # Load Longformer encoder
    self.encoder = AutoModel.from_pretrained(model_name)
    hidden_size = self.encoder.config.hidden_size

    self.projection = nn.Sequential(
      nn.Linear(hidden_size, dense_size),
      nn.ReLU(),
      nn.Dropout(dropout)
    )

    lora_config = LoraConfig(
      r=lora_r,
      lora_alpha=lora_alpha,
      target_modules=["query", "key", "value"],
      lora_dropout=lora_dropout,
      bias="none",
      task_type="FEATURE_EXTRACTION"
    )
    self.encoder = get_peft_model(self.encoder, lora_config)

    for name, param in self.encoder.named_parameters():
      if 'lora' not in name:
        param.requires_grad = False

  def forward(self, input_ids, attention_mask):
    outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
    cls_token = outputs.last_hidden_state[:, 0]
    return self.projection(cls_token)

class FocalLoss(nn.Module):
  """Focal Loss for addressing class imbalance by focusing on hard examples."""
  def __init__(self, alpha=0.75, gamma=2.0, reduction='mean'):
    super().__init__()
    self.alpha = alpha
    self.gamma = gamma
    self.reduction = reduction

  def forward(self, inputs, targets):
    ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
    pt = torch.exp(-ce_loss)
    focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
    
    if self.reduction == 'mean':
      return focal_loss.mean()
    elif self.reduction == 'sum':
      return focal_loss.sum()
    else:
      return focal_loss

class TextClassifier(nn.Module):
  def __init__(self, model_name, num_labels=2, class_weights=None, use_focal_loss=True, focal_alpha=0.75, focal_gamma=2.0):
    super().__init__()
    self.featurizer = TextFeaturizer(model_name)
    self.classifier = nn.Linear(256, num_labels)
    self.use_focal_loss = use_focal_loss
    self.focal_alpha = focal_alpha
    self.focal_gamma = focal_gamma
    
    # Store class weights for loss calculation (if not using focal loss)
    if class_weights is not None and not use_focal_loss:
      self.register_buffer('class_weights', torch.tensor(class_weights, dtype=torch.float32))
    else:
      self.class_weights = None
    
    # Initialize focal loss if using it
    if use_focal_loss:
      self.focal_loss = FocalLoss(alpha=focal_alpha, gamma=focal_gamma)

  def forward(self, input_ids, attention_mask, labels=None):
    features = self.featurizer(input_ids, attention_mask)
    logits = self.classifier(features)

    if labels is not None:
      if self.use_focal_loss:
        # Use Focal Loss for better handling of imbalanced datasets
        loss = self.focal_loss(logits, labels)
      else:
        # Fallback to weighted CrossEntropyLoss if not using focal loss
        if self.class_weights is not None:
          loss_fn = nn.CrossEntropyLoss(weight=self.class_weights)
        else:
          loss_fn = nn.CrossEntropyLoss()
        loss = loss_fn(logits, labels)
      return {"loss": loss, "logits": logits}
    return {"logits": logits}

def compute_metrics(eval_pred):
  """Compute metrics for evaluation during training."""
  predictions, labels = eval_pred
  preds = np.argmax(predictions, axis=1)
  
  accuracy = accuracy_score(labels, preds)
  precision = precision_score(labels, preds, zero_division=0, average='binary')
  recall = recall_score(labels, preds, zero_division=0, average='binary')
  f1 = f1_score(labels, preds, zero_division=0, average='binary')
  
  return {
    "accuracy": accuracy,
    "precision": precision,
    "recall": recall,
    "f1": f1
  }

def find_optimal_threshold(trainer, val_dataset):
  """Find optimal threshold that maximizes F1 score on validation set, with constraints to avoid extreme predictions."""
  predictions = trainer.predict(val_dataset)
  probs = torch.softmax(torch.tensor(predictions.predictions), dim=-1)
  probs_class1 = probs[:, 1].numpy()
  labels = predictions.label_ids
  
  best_threshold = 0.5
  best_f1 = 0
  best_metrics = {}
  
  # Try different thresholds with more granular search
  for threshold in np.arange(0.35, 0.65, 0.01):
    preds = (probs_class1 >= threshold).astype(int)
    
    # Calculate metrics
    f1 = f1_score(labels, preds, zero_division=0)
    precision = precision_score(labels, preds, zero_division=0)
    recall = recall_score(labels, preds, zero_division=0)
    
    # Prefer thresholds that don't predict all as one class
    pred_class1_count = np.sum(preds)
    total_samples = len(preds)
    
    # Penalize extreme thresholds (all 0s or all 1s)
    if pred_class1_count == 0 or pred_class1_count == total_samples:
      continue  # Skip thresholds that predict all as one class
    
    # Use F1 score, but also consider balance
    score = f1
    if f1 > best_f1:
      best_f1 = f1
      best_threshold = threshold
      best_metrics = {
        'f1': f1,
        'precision': precision,
        'recall': recall,
        'pred_class1_ratio': pred_class1_count / total_samples
      }
  
  if best_threshold == 0.5 and best_f1 == 0:
    # If no good threshold found, use the one that gives best balance
    print("Warning: No balanced threshold found, using 0.5")
    best_threshold = 0.5
    preds = (probs_class1 >= best_threshold).astype(int)
    best_f1 = f1_score(labels, preds, zero_division=0)
    best_metrics = {
      'f1': best_f1,
      'precision': precision_score(labels, preds, zero_division=0),
      'recall': recall_score(labels, preds, zero_division=0),
      'pred_class1_ratio': np.sum(preds) / len(preds)
    }
  
  print(f"Optimal threshold: {best_threshold:.3f}")
  print(f"  F1: {best_metrics.get('f1', 0):.4f}, Precision: {best_metrics.get('precision', 0):.4f}, Recall: {best_metrics.get('recall', 0):.4f}")
  print(f"  Predicted class 1 ratio: {best_metrics.get('pred_class1_ratio', 0):.2%}")
  return best_threshold

def evaluate_model(trainer, test_dataset, threshold=None):
  from sklearn.metrics import confusion_matrix  # Import here if not already imported
  predictions = trainer.predict(test_dataset)
  probs = torch.softmax(torch.tensor(predictions.predictions), dim=-1)
  probs_class1 = probs[:, 1].numpy()
  labels = predictions.label_ids

  # Use optimal threshold if provided, otherwise use argmax (threshold=0.5)
  if threshold is not None:
    preds = (probs_class1 >= threshold).astype(int)
    print(f"Using threshold: {threshold:.3f}")
  else:
    preds = np.argmax(predictions.predictions, axis=1)
    print("Using default threshold: 0.5 (argmax)")

  # Show first 10 predictions as examples
  print("Sample predictions (first 10):")
  for i, (label, pred) in enumerate(zip(labels[:10], preds[:10])):
    print(f"  True: {label}, Predicted: {pred}, Prob(class1): {probs_class1[i]:.3f}")
  if len(labels) > 10:
    print(f"  ... ({len(labels) - 10} more predictions)")

  accuracy = accuracy_score(labels, preds)
  precision = precision_score(labels, preds, zero_division=0)
  recall = recall_score(labels, preds, zero_division=0)
  f1 = f1_score(labels, preds, zero_division=0)
  
  # Calculate and print confusion matrix
  cm = confusion_matrix(labels, preds)
  
  print(f"\nConfusion Matrix:")
  print(f"                Predicted")
  print(f"              Non-Dep  Depressed")
  print(f"Actual Non-Dep    {cm[0,0]:4d}      {cm[0,1]:4d}")
  print(f"       Depressed   {cm[1,0]:4d}      {cm[1,1]:4d}")
  print(f"\nConfusion Matrix (detailed):")
  print(f"  True Negatives (TN): {cm[0,0]} - Correctly predicted non-depressed")
  print(f"  False Positives (FP): {cm[0,1]} - Non-depressed predicted as depressed")
  print(f"  False Negatives (FN): {cm[1,0]} - Depressed predicted as non-depressed")
  print(f"  True Positives (TP): {cm[1,1]} - Correctly predicted depressed")

  print(f"\nTest Metrics:")
  print(f"  Accuracy: {accuracy:.4f}")
  print(f"  Precision: {precision:.4f}")
  print(f"  Recall: {recall:.4f}")
  print(f"  F1 Score: {f1:.4f}")
  
  return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1, "confusion_matrix": cm}

def train_model(df, save_model=True, model_save_path="./depression_classifier_model"):
  tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

  train_df = df[df['split'] == 'train'].reset_index(drop=True)
  val_df = df[df['split'] == 'dev'].reset_index(drop=True) 
  test_df = df[df['split'] == 'test'].reset_index(drop=True)

  print(f"Training samples: {len(train_df)}")
  print(f"Validation samples: {len(val_df)}")
  print(f"Test samples: {len(test_df)}")
  
  # Calculate class weights to handle imbalanced dataset (for reference, though we use Focal Loss)
  from sklearn.utils.class_weight import compute_class_weight
  labels = train_df['depression_label'].values
  classes = np.unique(labels)
  class_weights_balanced = compute_class_weight('balanced', classes=classes, y=labels)
  
  # Apply multiplier to strengthen minority class weight (step 4)
  weight_multiplier = 1.8  # Increase weight for minority class
  class_weights = class_weights_balanced.copy()
  # Find minority class (class with fewer samples)
  class_counts = train_df['depression_label'].value_counts().sort_index()
  minority_class = class_counts.idxmin()
  minority_class_idx = list(classes).index(minority_class)
  class_weights[minority_class_idx] *= weight_multiplier
  
  class_weights_dict = dict(zip(classes, class_weights))
  print(f"\nClass distribution in training set:")
  print(train_df['depression_label'].value_counts().sort_index())
  print(f"Balanced class weights: {dict(zip(classes, class_weights_balanced))}")
  print(f"Adjusted class weights (multiplier={weight_multiplier}x for minority): {class_weights_dict}")
  print(f"Using Focal Loss (alpha=0.85, gamma=2.5) for better imbalance handling")

  train_dataset = TranscriptsDataset(train_df, tokenizer)
  val_dataset = TranscriptsDataset(val_df, tokenizer)
  test_dataset = TranscriptsDataset(test_df, tokenizer)
  
  # Use Focal Loss with adjusted parameters for better learning
  # Higher alpha (0.85) gives more weight to minority class, higher gamma (2.5) focuses more on hard examples
  model = TextClassifier(MODEL_NAME, num_labels=2, use_focal_loss=True, focal_alpha=0.85, focal_gamma=2.5)

  training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    logging_strategy="steps",
    logging_steps=10,  # Log more frequently to see training progress
    learning_rate=1e-5,  # Slightly lower learning rate for more stable training
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    num_train_epochs=15,  # More epochs to allow better learning
    gradient_accumulation_steps=4,
    fp16=True,
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_f1",  # Use F1 score instead of loss (step 5)
    greater_is_better=True,  # F1 score should be maximized
    warmup_steps=50,  # Add warmup for better training stability
    weight_decay=0.01,  # Add weight decay for regularization
  )

  trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,  # Add custom metrics (step 2)
  )

  print("\nStarting training...")
  trainer.train()

  # Find optimal threshold on validation set (step 3)
  print("\nFinding optimal threshold on validation set...")
  
  # First, check probability distribution on validation set
  val_predictions = trainer.predict(val_dataset)
  val_probs = torch.softmax(torch.tensor(val_predictions.predictions), dim=-1)
  val_probs_class1 = val_probs[:, 1].numpy()
  print(f"\nValidation set probability distribution (class 1):")
  print(f"  Min: {val_probs_class1.min():.4f}, Max: {val_probs_class1.max():.4f}")
  print(f"  Mean: {val_probs_class1.mean():.4f}, Std: {val_probs_class1.std():.4f}")
  print(f"  Median: {np.median(val_probs_class1):.4f}")
  
  optimal_threshold = find_optimal_threshold(trainer, val_dataset)

  print("\nEvaluating on test set...")
  metrics = evaluate_model(trainer, test_dataset, threshold=optimal_threshold)

  if save_model:
    print(f"\nSaving model to {model_save_path}...")
    trainer.save_model(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    print("Model saved successfully!")

  return trainer, metrics


## Load and Explore Data

In [7]:
# Load the data
df = process_daic_data(DATA_DIR)

# Display basic information about the dataset
print(f"Total samples: {len(df)}")
print(f"\nSplit distribution:")
print(df['split'].value_counts())
print(f"\nLabel distribution:")
print(df['depression_label'].value_counts())
print(f"\nLabel distribution by split:")
print(df.groupby(['split', 'depression_label']).size())

# Display first few rows
print(f"\nFirst few rows:")
df.head()

Total samples: 189

Split distribution:
split
train    107
test      47
dev       35
Name: count, dtype: int64

Label distribution:
depression_label
0    133
1     56
Name: count, dtype: int64

Label distribution by split:
split  depression_label
dev    0                   23
       1                   12
test   0                   33
       1                   14
train  0                   77
       1                   30
dtype: int64

First few rows:


Unnamed: 0,text,depression_label,split
0,i'm fine how about yourself i'm from los ange...,0,dev
1,<laughter> um moscow um my family moved to the...,0,dev
2,yes okay connecticut um to be an actor <lau...,0,dev
3,yes i'm okay uh i'm from here originally los ...,1,dev
4,yes i'm okay here in los angeles there's a ...,1,dev


## Train the Model

In [None]:
trainer, metrics = train_model(df, save_model=True, model_save_path="./depression_classifier_model")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/694 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

Training samples: 107
Validation samples: 0
Test samples: 47


pytorch_model.bin:   0%|          | 0.00/597M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/597M [00:00<?, ?B/s]


Starting training...


  | |_| | '_ \/ _` / _` |  _/ -_)
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize?ref=models
[34m[1mwandb[0m: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33mgabrielfreddi[0m ([33mfreddi[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin
