# Panic Project (DHLAB) - Multiclass Classification PyCaret Model for Panic Severity Prediction

author:  `@cyshin971`  

date:    `2025-06-23`  

version: `1-1`

In [None]:
version = "1-1"

# 📚 | Import Libraries 

In [None]:
import config as cfg
import logging

import os
import pandas as pd
pd.set_option('display.float_format', lambda x: '%.4f' % x)

import numpy as np
import matplotlib.pyplot as plt
logging.getLogger('matplotlib').setLevel(logging.WARNING)

from library.pandas_utils import move_column, remove_columns, create_empty_df, read_csv
from library.text_utils import save_as_csv
from library.json_utils import save_dict_to_file, load_dict_from_file
from library.path_utils import get_file_path

from pycaret.classification import *
import shap
from sklearn.ensemble import VotingClassifier, StackingClassifier

# ⚙️ | Settings

In [None]:
scraped_data_filename = "final_result_20250620_720" # Name of the scraped data file without extension (.csv)

# 📁 | Path Variables 

In [None]:
DATA_PATH = "./_data"
TMP_PATH = "./cys/_tmp"
OUT_PATH = "./cys/_output"

try:
	features_dict = load_dict_from_file(OUT_PATH, 'panic_features_dict')
except FileNotFoundError:
    raise FileNotFoundError(f"File not found: {get_file_path(OUT_PATH, 'panic_features_dict')}. Please run data_analysis.ipynb first.")
print(f"Loaded features dict with {len(features_dict)} keys:")
for k, v in features_dict.items():
    if k == 'preproc_version':
        preproc_version = v
    if k == 'imputation_version':
        imputation_version = v
    elif k == 'analysis_version':
        analysis_version = v
        
    else:
        print(f'{k}: {features_dict[k]}')

if scraped_data_filename is None:
	raise ValueError("scraped_data_filename not found in features_dict")

PREPROC_PATH = f"{OUT_PATH}/{scraped_data_filename}/preprocessed"
IMPUTED_PATH = f"{OUT_PATH}/{scraped_data_filename}/imputed"
ANALYSIS_PATH = f"{OUT_PATH}/{scraped_data_filename}/analysis"
OUTPUT_PATH = f"{OUT_PATH}/severity_multiclass_pycaret"

# 🌐 | Global Variables

In [None]:
class OUTPUT:
    num_classes = 3
    class_names = ['Mild', 'Moderate', 'Severe']
    
    label2name = dict(enumerate(class_names))
    name2label = {v: k for k, v in label2name.items()}
    
    plot_label2name = {
		'class_0': 'Mild',
		'class_1': 'Moderate',
		'class_2': 'Severe'
	}
    plot_name2label = {v: k for k, v in plot_label2name.items()}

    color_name2color = {
		'Mild': 'skyblue',
		'Moderate': 'orange',
		'Severe': 'lightcoral'
	}
    
    output_dict = {
		1: 'Mild',
		2: 'Mild',
		3: 'Moderate',
		4: 'Severe',
		5: 'Severe'
	}
    
    output_dict_inv = {v: k for k, v in output_dict.items()}

    @staticmethod
    def get_label_name(label):
        return OUTPUT.label2name[label]
    @staticmethod
    def get_label_from_name(name):
        return OUTPUT.name2label[name]

# ⚒️ | Preprocessed Data

In [None]:
pre_data = read_csv(get_file_path(IMPUTED_PATH, f'panic_pre_data_filled_{imputation_version}({scraped_data_filename}).csv'))
display(pre_data.head(3))
metadata = read_csv(get_file_path(PREPROC_PATH, f'panic_metadata_{preproc_version}({scraped_data_filename}).csv'))
display(metadata.head(3))
demography_data = read_csv(get_file_path(PREPROC_PATH, f'panic_demography_data_{preproc_version}({scraped_data_filename}).csv'))
display(demography_data.head(3))
# patient_data = read_csv(get_file_path(ANALYSIS_PATH, f'panic_patient_analysis_{analysis_version}({scraped_data_filename}).csv'))
# display(patient_data.head(3))

print(f"Number of Demographic Features: {len(features_dict['demography'])}")
print(f"Number of Daily Features: {len(features_dict['dailylog'])}")
print(f"Number of Life Log Features: {len(features_dict['lifelog'])}")
print(f"Number of Questionnaire Features: {len(features_dict['questionnaire'])}")

# 🔄️ | Data Processing

In [None]:
dbp_param = 1

filtered_metadata = create_empty_df()
filtered_pre_data = create_empty_df()
proc_data_init = create_empty_df()

# Filter metadata for entries with at least dbp_param days of prior data
print(f"Found {len(metadata[metadata['panic_label'] == 1])} entries with panic label.")
proc_data_init = metadata[(metadata[f'panic_label'] == 1) &
                          (metadata[f'valid_entry_{dbp_param}'] == 1)].copy()
filtered_panic_metadata_entry_ids = proc_data_init['entry_id'].unique()
filtered_metadata = metadata[(metadata['ref_event_id'].isin(filtered_panic_metadata_entry_ids)) &
                             (metadata[f'dbp'] <= dbp_param)].copy()
print(f"Found {len(filtered_panic_metadata_entry_ids)} entries with panic label and at least {dbp_param} days of prior data.")

# Perform checks
unique_dbp = filtered_metadata['dbp'].unique()
if len(unique_dbp) != dbp_param:
	raise ValueError(f"Expected {dbp_param} unique DBP values, found {len(unique_dbp)}: {unique_dbp}")
del unique_dbp

filtered_entry_ids = filtered_metadata['entry_id'].unique()
filtered_panic_entry_ids = filtered_metadata['ref_event_id'].unique()
# Filter pre_data for entries that reference panic events with at least dbp_param days of prior data
filtered_pre_data = pre_data[pre_data['entry_id'].isin(filtered_entry_ids)].copy()

# Perform checks
if len(filtered_pre_data) != len(filtered_metadata):
	raise ValueError(f"Filtered pre_data length {len(filtered_pre_data)} does not match filtered_metadata length {len(filtered_metadata)}")
print(f"Filtered data contains {len(filtered_panic_entry_ids)} unique panic events and {len(filtered_entry_ids)} unique entry IDs.")
print(f"Filtered pre_data contains {len(filtered_pre_data['ID'].unique())} unique IDs.")
del filtered_entry_ids

# Initialize processed data with correct entries
proc_data_init = proc_data_init[features_dict['id']+features_dict['label']].copy()
print(f"Initial processed data contains {len(proc_data_init)} entries with {len(proc_data_init.columns)} columns.")
display(proc_data_init.head(5))

In [None]:
proc_data_int = create_empty_df()
proc_data_int = proc_data_init.copy()

# remove 'severity' from features_dict['dailylog]
features_dict['dailylog'] = [f for f in features_dict['dailylog'] if f != 'severity']

# use demography data to add demographic features to proc_data using ID (multiple entries per ID)
proc_data_int = pd.merge(proc_data_int, demography_data, on='ID', how='left')

for i in range(1, dbp_param + 1):
    # make a dictionary of 'entry_id' : 'ref_event_id' for the current dbp
	dbp_dict = filtered_metadata[filtered_metadata['dbp'] == i].set_index('entry_id')['ref_event_id'].to_dict()
	print(f"Processing data for {i} days before panic.")

	entry_ids = dbp_dict.keys()
	filtered_pre_data_i = filtered_pre_data[filtered_pre_data['entry_id'].isin(entry_ids)].copy()
	if len(filtered_pre_data_i) != len(dbp_dict.keys()):
		raise ValueError(f"Filtered pre_data length {len(filtered_pre_data_i)} does not match filtered_metadata length {len(dbp_dict.keys())} for {i} days before panic")
  	# Update 'entry_id' in filtered_pre_data_i to the corresponding 'ref_event_id' from dbp_dict
	filtered_pre_data_i['entry_id'] = filtered_pre_data_i['entry_id'].map(dbp_dict)
	
	features_list = ['entry_id']+features_dict['dailylog']+features_dict['lifelog']
	if i == dbp_param:
		features_list += features_dict['questionnaire']
	filtered_pre_data_i = filtered_pre_data_i[features_list].copy()
	# rename ALL non-ID columns to include the suffix
	cols_to_rename = [c for c in filtered_pre_data_i.columns if c != 'entry_id']
	rename_map = {c: f"{c}_{i}" for c in cols_to_rename}
	filtered_pre_data_i.rename(columns=rename_map, inplace=True)
	
	proc_data_int = pd.merge(proc_data_int, filtered_pre_data_i, on='entry_id', how='left', suffixes=('', f'_{i}'))

# Use OUTPUT.output_dict to map severity labels
proc_data_int['severity'] = proc_data_int['severity'].map(OUTPUT.output_dict)

display(proc_data_int.head(3))

In [None]:
proc_data = create_empty_df()
proc_data = proc_data_int.copy()

r_cols = ['panic',
          'dbp',
          'panic_label']
remove_columns(proc_data, r_cols)
move_column(proc_data, 'severity', -1)
display(proc_data.head(3))
save_as_csv(proc_data, OUTPUT_PATH, f'panic_severity_multi_proc_data_{dbp_param}days_{version}({scraped_data_filename})', index=False)

In [None]:
# make a histogram of the severity distribution
plt.figure(figsize=(5, 3))
severity_counts = proc_data['severity'].value_counts().sort_index()
total_count = severity_counts.sum()
colors = [OUTPUT.color_name2color['Mild'], OUTPUT.color_name2color['Moderate'], OUTPUT.color_name2color['Severe']]
ax = severity_counts.plot(kind='bar', color=colors)
plt.title('Severity Distribution')
plt.ylabel('Count')
plt.xticks(rotation=0)
plt.grid(axis='y')

# Add labels with counts and percentages at the center of each bar
for p in ax.patches:
	count = p.get_height()
	percentage = f"{(count / total_count * 100):.1f}%"
	ax.annotate(f'{count}\n{percentage}',
				(p.get_x() + p.get_width() / 2., p.get_height() / 2.),
				ha='center', va='center', fontsize=10, color='black', xytext=(0, 0),
				textcoords='offset points')

# Remove the 'severity' label from the bottom
ax.set_xlabel('')

plt.tight_layout()
plt.show()

# pd.crosstab(proc_data['severity'], proc_data['dataset'], margins=True, margins_name='Total')

# 🤖 | Modeling

In [None]:
data = proc_data.copy()
remove_columns(data, features_dict['id'])
print(f"Processed data contains {len(data)} entries with {len(data.columns)} columns after removing ID columns.")
display(data.head(3))

In [None]:
# Initialize PyCaret setup
clf = setup(
    data=data,
    target='severity',               # replace with your target column name
    session_id=123,                  # for reproducibility
    normalize=True,                  # scale numeric features
    transformation=False,            # turn off power transformation
    train_size=0.8,                  # 80/20 split
    fold=5,                          # 5-fold cross-validation
    fold_strategy='stratifiedkfold',
    numeric_imputation='mean',
    remove_multicollinearity=True,   # for small datasets, this is often helpful
	multicollinearity_threshold=0.9, # threshold for removing multicollinear features
	# html=False,                    # do not generate HTML report (use plain-text output)
    verbose=True
)

# 🚂 | Training

In [None]:
# Compare baseline models and select the best by Accuracy
best_model = compare_models(sort='Accuracy')

# 🧪 | Test

In [None]:
results = pull()  # Get the latest output table as a DataFrame
# Cross-Validation results
print("Cross-Validation Results:")
display(results)  # Jupyter display (can further style if you want)

In [None]:
# Evaluate on hold-out set (20% test split)
holdout_results = predict_model(best_model)
print(f"Hold-out Set Results (20% test split) for {OUTPUT.num_classes} classes (test_size={len(holdout_results)}):")

# 🔍 | Analysis

## SHAP Values

In [None]:
# Check if the best model is TreeExplainer-compatible
tree_model_ids = ['et', 'rf', 'gbc', 'lightgbm', 'dt']
tree_model_names = [
    'Extra Trees Classifier', 'Random Forest Classifier',
    'Gradient Boosting Classifier', 'Light Gradient Boosting Machine',
    'Decision Tree Classifier'
]

# Function to check compatibility by class name
def is_tree_model(model):
    model_name = model.__class__.__name__.lower()
    # Try common tree model keywords
    return any(keyword in model_name for keyword in ['forest', 'tree', 'boost', 'lightgbm'])


In [None]:
# SHAP analysis only if TreeExplainer compatible
is_compatible = is_tree_model(best_model)

if is_compatible:
    print(f"Best model ({best_model.__class__.__name__}) is compatible with SHAP TreeExplainer.")

    # Extract features (remove prediction/score columns)
    feature_cols = [col for col in holdout_results.columns if col in data.columns and col != 'severity']
    X_holdout = holdout_results[feature_cols]

    # --- Robust estimator unwrapping for ensembles ---
    model_to_explain = best_model
    base_estimator_key = None  # Track which base estimator is used
    # Unwrap only if Voting or Stacking ensemble
    if isinstance(model_to_explain, (VotingClassifier, StackingClassifier)):
        named_estimators = dict(model_to_explain.named_estimators_)
        # Try to select a tree-based model in order of preference
        for key in ['rf', 'et', 'gbc', 'lightgbm', 'dt']:
            if key in named_estimators and is_tree_model(named_estimators[key]):
                model_to_explain = named_estimators[key]
                base_estimator_key = key
                print(f"Selected base estimator '{key}' from Voting/Stacking ensemble.")
                break
        if base_estimator_key is None:
            print("Warning: No compatible tree model found in the ensemble; SHAP will use the full ensemble.")

    print("Model to explain:", type(model_to_explain))

    # Build the SHAP TreeExplainer
    explainer = shap.TreeExplainer(model_to_explain)
    shap_values = explainer.shap_values(X_holdout)

    # SHAP summary plot (for multiclass)
    # shap.summary_plot(shap_values, X_holdout)

    # Get SHAP values as DataFrame (one per class)
    shap_dfs = {}
    
    if isinstance(shap_values, list):
        # Standard SHAP output for multiclass: list of [n_samples, n_features] arrays (one per class)
        for i, class_shap in enumerate(shap_values):
            shap_dfs[f"class_{i}"] = pd.DataFrame(class_shap, columns=X_holdout.columns, index=X_holdout.index)
    elif isinstance(shap_values, np.ndarray) and shap_values.ndim == 3:
        # SHAP returned shape: (n_samples, n_features, n_classes)
        n_classes = shap_values.shape[2]
        for i in range(n_classes):
            shap_dfs[f"class_{i}"] = pd.DataFrame(shap_values[:,:,i], columns=X_holdout.columns, index=X_holdout.index)
    else:
        # Binary or regression: single 2D array
        shap_dfs["shap_values"] = pd.DataFrame(shap_values, columns=X_holdout.columns, index=X_holdout.index)

    # display(shap_dfs["class_0"].head())

else:
    print(f"Best model ({best_model.__class__.__name__}) is NOT compatible with SHAP TreeExplainer.")
    print("Please select one of the following tree models for SHAP analysis: 'et', 'rf', 'gbc', 'lightgbm', 'dt'")
    print("Example:")
    print("rf_model = create_model('rf')\nrf_model = finalize_model(rf_model)\n")


In [None]:
top_n = 10  # Number of features to show

for class_label in OUTPUT.plot_label2name.keys():
    df = shap_dfs[class_label]
    # Compute mean absolute SHAP value for each feature
    feature_importance = df.abs().mean(axis=0).sort_values(ascending=False)
    # Get top features
    top_features = feature_importance.head(top_n)
    
    # Print as table
    print(f"\nTop {top_n} features for {OUTPUT.plot_label2name[class_label]}:")
    print(top_features)
    
    # Bar plot
    plt.figure(figsize=(6,4))
    top_features[::-1].plot(kind='barh')
    plt.title(f"Top {top_n} Features by Mean(|SHAP|) for {OUTPUT.plot_label2name[class_label]}")
    plt.xlabel("Mean(|SHAP Value|)")
    plt.tight_layout()
    plt.show()

# Notes

In [None]:
# Create a specific model (e.g., LightGBM)
# model = create_model('lightgbm')

# Tune the model hyperparameters
# tuned_model = tune_model(model, optimize='Accuracy')

# Ensemble models (optional)
# blended_model = blend_models([tuned_model, best_model])