In [None]:
# Age 5.5–6.9 ASD Screening Model Training (Color–Shape / DCCS Game)

from pathlib import Path
import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay,
    roc_curve,
    auc,
)

sns.set(style="whitegrid")

# ------------------------------------------------------------------
# Environment + paths (works locally and in Google Colab)
# ------------------------------------------------------------------

def is_colab() -> bool:
    try:
        import google.colab  # type: ignore
        return True
    except Exception:
        return False

IN_COLAB = is_colab()
print("Running in Colab:", IN_COLAB)

# If you run locally from: .../ML_TRAINING/age_specific_models
# Path.cwd() should be .../age_specific_models
# PROJECT_ROOT should be Cognitive_Flexibility
if not IN_COLAB:
    PROJECT_ROOT = Path.cwd().parent.parent
else:
    PROJECT_ROOT = Path("/content")

SAMPLE_DATA_DIR = PROJECT_ROOT / "SAMPLE_DATASETS"
ONLINE_DATA_DIR = PROJECT_ROOT / "Online Datasets"

print("PROJECT_ROOT:", PROJECT_ROOT)
print("SAMPLE_DATA_DIR:", SAMPLE_DATA_DIR)
print("ONLINE_DATA_DIR:", ONLINE_DATA_DIR)


def upload_files_if_needed(required_filenames: list[str]) -> dict[str, Path]:
    """Upload required files in Colab. Returns mapping: filename -> local path."""
    if not IN_COLAB:
        return {}

    from google.colab import files  # type: ignore

    print("\nPlease upload these files from your PC:")
    for f in required_filenames:
        print(" -", f)

    uploaded = files.upload()
    paths: dict[str, Path] = {name: Path("/content") / name for name in uploaded.keys()}

    missing = [f for f in required_filenames if f not in paths]
    if missing:
        raise FileNotFoundError(f"Missing uploads: {missing}. Please upload them and run this cell again.")

    return paths


# ------------------------------------------------------------------
# Feature engineering utility for DCCS (Color–Shape)
# - Local: import from `ML_TRAINING/utils/feature_engineering.py`
# - Colab: fallback to inline implementation
# ------------------------------------------------------------------

DCCS_AGE_BINS = [66, 72, 78, 83]

try:
    if not IN_COLAB:
        sys.path.append(str(PROJECT_ROOT / "ML_TRAINING"))
        from utils.feature_engineering import FeatureEngineer  # type: ignore
    else:
        raise ImportError("Colab: using inline FeatureEngineer")
except Exception:
    from scipy import stats

    class FeatureEngineer:
        def __init__(self, age_bins, random_state: int = 42):
            self.age_bins = age_bins
            self.random_state = random_state

        def create_age_bins(self, age_months: pd.Series) -> pd.Series:
            return pd.cut(
                age_months,
                bins=self.age_bins,
                labels=[f"{self.age_bins[i]}-{self.age_bins[i+1]}" for i in range(len(self.age_bins) - 1)],
                include_lowest=True,
            )

        def calculate_zscore_by_age(self, df: pd.DataFrame, value_col: str, age_bin_col: str = "age_bin") -> pd.Series:
            zscores = df.groupby(age_bin_col)[value_col].transform(
                lambda x: stats.zscore(x.fillna(x.mean())) if len(x) > 1 and x.std() > 0 else 0
            )
            return zscores.fillna(0)

        def create_composite_index(self, df: pd.DataFrame, components: dict[str, float]) -> pd.Series:
            idx = pd.Series(0.0, index=df.index)
            total_weight = sum(components.values())
            for col, w in components.items():
                if col in df.columns:
                    col_data = df[col].astype(float)
                    if col_data.max() > 1:
                        col_data = (col_data - col_data.min()) / (col_data.max() - col_data.min() + 1e-10)
                    idx += col_data * (w / total_weight)
            return idx

        def engineer_color_shape_features(self, df: pd.DataFrame) -> pd.DataFrame:
            features_df = df.copy()

            # Cognitive Flexibility Index
            if all(c in features_df.columns for c in [
                "accuracy_drop_percent",
                "switch_cost_ms",
                "perseverative_error_rate_post_switch",
            ]):
                acc = features_df["accuracy_drop_percent"].astype(float) / 100.0
                swc = features_df["switch_cost_ms"].astype(float) / (features_df["switch_cost_ms"].max() + 1e-10)
                per = features_df["perseverative_error_rate_post_switch"].astype(float)
                features_df["cognitive_flexibility_index"] = acc * 0.4 + swc * 0.3 + per * 0.3
            else:
                features_df["cognitive_flexibility_index"] = 0

            # Perseveration Index
            if all(c in features_df.columns for c in [
                "total_perseverative_errors",
                "number_of_consecutive_perseverations",
            ]):
                features_df["perseveration_index"] = self.create_composite_index(
                    features_df,
                    {
                        "total_perseverative_errors": 0.6,
                        "number_of_consecutive_perseverations": 0.4,
                    },
                )
            else:
                features_df["perseveration_index"] = 0

            # Age-normalize to z-scores
            if "age_months" in features_df.columns:
                features_df["age_bin"] = self.create_age_bins(features_df["age_months"].astype(float))
                for col in ["cognitive_flexibility_index", "perseveration_index"]:
                    zcol = col.replace("_index", "_zscore")
                    features_df[zcol] = self.calculate_zscore_by_age(features_df, col, "age_bin")

            return features_df

print("FeatureEngineer ready")


In [None]:
# Load online questionnaire dataset (Autism Screening Data Combined)
# - Local: read from your repo folder `Online Datasets/`
# - Colab: upload the CSV from your PC

COMBINED_FILENAME = "Autism_Screening_Data_Combined.csv"

if IN_COLAB:
    uploaded_paths = upload_files_if_needed([COMBINED_FILENAME])
    COMBINED_PATH = uploaded_paths[COMBINED_FILENAME]
else:
    COMBINED_PATH = ONLINE_DATA_DIR / "Autism screening data for toddlers" / COMBINED_FILENAME

print("COMBINED_PATH:", COMBINED_PATH)

df_combined = pd.read_csv(COMBINED_PATH)
print("Combined dataset shape:", df_combined.shape)
display(df_combined.head())

In [None]:
# Filter online questionnaire data to age 66–83 months and engineer auxiliary features

LABEL_MAP = {"YES": 1, "Yes": 1, "Y": 1, 1: 1,
             "NO": 0, "No": 0, "N": 0, 0: 0}

# Age filter (months)
df_aux = df_combined[(df_combined["Age"] >= 66) & (df_combined["Age"] < 83)].copy()

# Ensure A1–A10 exist
A_COLS = [f"A{i}" for i in range(1, 11)]
for c in A_COLS:
    if c not in df_aux.columns:
        df_aux[c] = 0

# Questionnaire-derived auxiliary features

df_aux["questionnaire_score"] = df_aux[A_COLS].sum(axis=1)
df_aux["critical_items_failed"] = df_aux[A_COLS].sum(axis=1)
df_aux["social_responsiveness_score"] = (df_aux[["A1", "A4", "A5"]].sum(axis=1) / 3.0) * 100

# ASD label

df_aux["asd_label"] = df_aux["Class"].map(LABEL_MAP)

# Unified age column

df_aux["age_months"] = df_aux["Age"].astype(float)

aux_cols = [
    "age_months",
    "questionnaire_score",
    "critical_items_failed",
    "social_responsiveness_score",
    "asd_label",
]

df_aux_simple = df_aux[aux_cols].dropna(subset=["asd_label"]).reset_index(drop=True)

print("Auxiliary questionnaire (66–83 months) shape:", df_aux_simple.shape)
display(df_aux_simple.head())
display(df_aux_simple["asd_label"].value_counts().to_frame("count"))

In [None]:
# Load your Color–Shape / DCCS hospital/system data
# - Local: reads from SAMPLE_DATASETS/
# - Colab: upload the two CSVs from your PC

DCCS_ASD_FILENAME = "age_5_6_dccs_asd.csv"
DCCS_CTRL_FILENAME = "age_5_6_dccs_control.csv"

if IN_COLAB:
    uploaded_paths = upload_files_if_needed([
        DCCS_ASD_FILENAME,
        DCCS_CTRL_FILENAME,
    ])
    DCCS_ASD = uploaded_paths[DCCS_ASD_FILENAME]
    DCCS_CTRL = uploaded_paths[DCCS_CTRL_FILENAME]
else:
    DCCS_ASD = SAMPLE_DATA_DIR / DCCS_ASD_FILENAME
    DCCS_CTRL = SAMPLE_DATA_DIR / DCCS_CTRL_FILENAME

print("DCCS_ASD:", DCCS_ASD)
print("DCCS_CTRL:", DCCS_CTRL)

df_dccs_asd = pd.read_csv(DCCS_ASD)
df_dccs_ctrl = pd.read_csv(DCCS_CTRL)

# Ensure age_months exists (adapt if your column is named differently)
for df_h in (df_dccs_asd, df_dccs_ctrl):
    if "age_months" not in df_h.columns:
        if "Age_Mons" in df_h.columns:
            df_h["age_months"] = df_h["Age_Mons"]
        elif "Age" in df_h.columns:
            df_h["age_months"] = df_h["Age"]

# Restrict to 66–83 months
for df_h in (df_dccs_asd, df_dccs_ctrl):
    if "age_months" in df_h.columns:
        df_h.query("66 <= age_months < 83", inplace=True)

# Add ASD label if missing
if "asd_label" not in df_dccs_asd.columns:
    df_dccs_asd["asd_label"] = 1
if "asd_label" not in df_dccs_ctrl.columns:
    df_dccs_ctrl["asd_label"] = 0

print("DCCS ASD shape:", df_dccs_asd.shape)
print("DCCS CTRL shape:", df_dccs_ctrl.shape)
display(df_dccs_asd.head())
display(df_dccs_ctrl.head())

In [None]:
# Merge DCCS game metrics + clinician reflection + online questionnaire aux, then apply feature engineering

DCCS_FEATURES = [
    "age_months",
    "pre_switch_accuracy",
    "post_switch_accuracy",
    "mixed_block_accuracy",
    "switch_cost_ms",
    "accuracy_drop_percent",
    "total_perseverative_errors",
    "perseverative_error_rate_post_switch",
    "number_of_consecutive_perseverations",
    "avg_rt_pre_switch_ms",
    "avg_rt_post_switch_ms",
    "rt_variability",
    # clinician reflection (if present)
    "attention_level",
    "engagement_level",
    "frustration_tolerance",
    "instruction_following",
    "overall_behavior",
]

# Keep only columns that actually exist
DCCS_FEATURES = [c for c in DCCS_FEATURES if c in df_dccs_asd.columns or c in df_dccs_ctrl.columns]

asd_game = df_dccs_asd[[c for c in DCCS_FEATURES if c in df_dccs_asd.columns]].copy()
ctrl_game = df_dccs_ctrl[[c for c in DCCS_FEATURES if c in df_dccs_ctrl.columns]].copy()

asd_game["asd_label"] = 1
ctrl_game["asd_label"] = 0

DF_DCCS = pd.concat([asd_game, ctrl_game], ignore_index=True)
DF_DCCS["age_months"] = DF_DCCS["age_months"].astype(float)

print("Raw DCCS dataset shape:", DF_DCCS.shape)
display(DF_DCCS.head())

# ------------------------------------------------------------------
# Add questionnaire auxiliary features from ONLINE data (nearest-age match)
# ------------------------------------------------------------------

aux_for_merge = df_aux_simple[[
    "age_months",
    "questionnaire_score",
    "critical_items_failed",
    "social_responsiveness_score",
]].dropna().sort_values("age_months")

cs_sorted = DF_DCCS.sort_values("age_months")

DF_DCCS = pd.merge_asof(
    cs_sorted,
    aux_for_merge,
    on="age_months",
    direction="nearest",
    tolerance=3,  # months
)

print("After adding online aux features (some may be NaN if no close match):")
display(DF_DCCS.head())

# ------------------------------------------------------------------
# Apply DCCS feature engineering (indices + age z-scores)
# ------------------------------------------------------------------

fe = FeatureEngineer(age_bins=DCCS_AGE_BINS)
DF_DCCS = fe.engineer_color_shape_features(DF_DCCS)

print("After DCCS feature engineering:")
display(DF_DCCS.head())

In [None]:
# Basic EDA for DCCS data

# Class balance
plt.figure(figsize=(4, 4))
sns.countplot(x="asd_label", data=DF_DCCS)
plt.title("ASD vs Non-ASD (Age 5.5–6.9, DCCS)")
plt.xticks([0, 1], ["Non-ASD", "ASD"])
plt.show()

# Accuracy distributions by class
for col in ["pre_switch_accuracy", "post_switch_accuracy", "mixed_block_accuracy"]:
    if col in DF_DCCS.columns:
        plt.figure(figsize=(6, 4))
        sns.boxplot(x="asd_label", y=col, data=DF_DCCS)
        plt.title(f"{col} by ASD Label")
        plt.xticks([0, 1], ["Non-ASD", "ASD"])
        plt.show()

# Perseverative errors by class
if "total_perseverative_errors" in DF_DCCS.columns:
    plt.figure(figsize=(6, 4))
    sns.boxplot(x="asd_label", y="total_perseverative_errors", data=DF_DCCS)
    plt.title("Total Perseverative Errors by ASD Label")
    plt.xticks([0, 1], ["Non-ASD", "ASD"])
    plt.show()

# Correlation heatmap
numeric_cols = DF_DCCS.select_dtypes(include=["number"]).columns
corr = DF_DCCS[numeric_cols].corr()

plt.figure(figsize=(7, 6))
sns.heatmap(corr, cmap="coolwarm", center=0)
plt.title("Correlation Heatmap (DCCS Features)")
plt.show()

In [None]:
# Train/test split, outlier handling, class balancing, and scaling

feature_cols = [c for c in DF_DCCS.columns if c not in ["asd_label"]]

X = DF_DCCS[feature_cols].copy()
y = DF_DCCS["asd_label"].astype(int)

# 1) Impute missing values
X = X.fillna(X.median(numeric_only=True))

# 2) Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25, random_state=42, stratify=y
)

# 3) Outlier clipping (winsorize) on TRAIN only
num_cols = X_train.select_dtypes(include=["number"]).columns
low_q = X_train[num_cols].quantile(0.01)
high_q = X_train[num_cols].quantile(0.99)
X_train[num_cols] = X_train[num_cols].clip(lower=low_q, upper=high_q, axis=1)
X_test[num_cols] = X_test[num_cols].clip(lower=low_q, upper=high_q, axis=1)

# 4) Class balancing (bootstrap upsampling of minority class in TRAIN)
from sklearn.utils import resample

train_df = X_train.copy()
train_df["target"] = y_train.values

counts = train_df["target"].value_counts()
if len(counts) == 2 and counts.min() < counts.max():
    maj = counts.idxmax()
    minc = counts.idxmin()

    df_maj = train_df[train_df["target"] == maj]
    df_min = train_df[train_df["target"] == minc]

    df_min_up = resample(df_min, replace=True, n_samples=len(df_maj), random_state=42)
    train_bal = pd.concat([df_maj, df_min_up]).sample(frac=1.0, random_state=42)

    X_train = train_bal.drop(columns=["target"])
    y_train = train_bal["target"].astype(int)
    print("Balanced train counts:\n", y_train.value_counts())
else:
    print("Train counts:\n", y_train.value_counts())

# 5) Scale
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("Train size:", X_train.shape[0], " Test size:", X_test.shape[0])

In [None]:
# Train logistic regression model (ASD vs Non-ASD, engineered features)

log_reg = LogisticRegression(max_iter=2000, class_weight="balanced")
log_reg.fit(X_train_scaled, y_train)

y_pred = log_reg.predict(X_test_scaled)
y_proba = log_reg.predict_proba(X_test_scaled)[:, 1]

print("Classification report (threshold 0.5):")
print(classification_report(y_test, y_pred, target_names=["Non-ASD", "ASD"]))

cm = confusion_matrix(y_test, y_pred)
ConfusionMatrixDisplay(cm, display_labels=["Non-ASD", "ASD"]).plot(cmap="Blues")
plt.title("Confusion Matrix (DCCS 5.5–6.9)")
plt.show()

# ROC curve
fpr, tpr, thr = roc_curve(y_test, y_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(5, 5))
plt.plot(fpr, tpr, label=f"ROC AUC = {roc_auc:.3f}")
plt.plot([0, 1], [0, 1], "k--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate (Sensitivity)")
plt.title("ROC Curve (DCCS 5.5–6.9)")
plt.legend(loc="lower right")
plt.show()

# Optional: view coefficient magnitudes for interpretability
feature_cols = [c for c in DF_DCCS.columns if c not in ["asd_label"]]
coef = pd.Series(log_reg.coef_[0], index=feature_cols).sort_values(key=np.abs, ascending=False)

plt.figure(figsize=(7, 5))
sns.barplot(x=coef.values, y=coef.index)
plt.title("Logistic Regression Coefficients (absolute importance)")
plt.xlabel("Coefficient")
plt.ylabel("Feature")
plt.show()

display(coef.to_frame("coef"))

In [None]:
# Save trained model + scaler (and download if running in Colab)

import joblib

MODEL_NAME = "model_age_5_5_6_9_color_shape"

if IN_COLAB:
    out_dir = Path("/content") / "model_artifacts"
else:
    out_dir = PROJECT_ROOT / "ML_TRAINING" / "models"

out_dir.mkdir(parents=True, exist_ok=True)

model_path = out_dir / f"{MODEL_NAME}.pkl"
scaler_path = out_dir / f"scaler_{MODEL_NAME}.pkl"

joblib.dump(log_reg, model_path)
joblib.dump(scaler, scaler_path)

print("Saved model:", model_path)
print("Saved scaler:", scaler_path)

# Local: also copy into backend production folder
if not IN_COLAB:
    backend_model_dir = PROJECT_ROOT / "senseai_backend" / "ml_engine" / "models"
    backend_model_dir.mkdir(parents=True, exist_ok=True)

    import shutil

    shutil.copy2(model_path, backend_model_dir / model_path.name)
    shutil.copy2(scaler_path, backend_model_dir / scaler_path.name)

    print("Copied into backend:", backend_model_dir)

# Colab: download the files to your PC
if IN_COLAB:
    from google.colab import files  # type: ignore

    files.download(str(model_path))
    files.download(str(scaler_path))
