<a href="https://colab.research.google.com/github/christian-ian-dev/Estimation-of-Obesity-Levels/blob/Bayesian-Model/Team_3_Bayesian_Network_Study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install the official UCI Machine Learning Repository library and pgmpy
!pip install ucimlrepo pgmpy

In [None]:
# =============================================================================
# Obesity Levels Study — Bayesian Networks (BN)
# =============================================================================
# 1) Starts with requested pgmpy imports
# 2) Starts with requested PyMC imports
# 3) Creates bins for numeric features using pd.cut
# 4) Provides TWO BN approaches: pgmpy + PyMC
#
# Outputs:
#   outputs_bn/
#     cleaned_dataset_bn.csv
#     pgmpy/bn_structure_edges.csv
#     pymc/pymc_cpt_posterior_summary.csv
# -----------------------------------------------------------------------------

import numpy as np
import pandas as pd
import os
from scipy import stats
from dataclasses import dataclass
from typing import Dict, List, Tuple

# =============================================================================
# Config + mappings
# =============================================================================
@dataclass
class Config:
    uci_id: int = 544
    output_dir: str = "outputs_bn"
    exclude_insufficient_weight: bool = True
    random_state: int = 42

    # PyMC can get slow with many nodes; default to a subset.
    pymc_nodes: Tuple[str, ...] = (
        "family_history_overweight",
        "high_calorie_food_freq",
        "vegetable_intake_bin",
        "physical_activity_bin",
        "technology_use_bin",
        "obesity_level_3cat",
    )

    draws: int = 1200
    tune: int = 800
    chains: int = 2
    target_accept: float = 0.9


RENAME_MAP: Dict[str, str] = {
    "Gender": "gender",
    "Age": "age",
    "Height": "height_m",
    "Weight": "weight_kg",
    "family_history_with_overweight": "family_history_overweight",
    "FAVC": "high_calorie_food_freq",
    "FCVC": "vegetable_intake_freq",
    "NCP": "num_main_meals",
    "CAEC": "snacking_between_meals",
    "SMOKE": "smoker",
    "CH2O": "water_intake_freq",
    "SCC": "calorie_monitoring",
    "FAF": "physical_activity_freq",
    "TUE": "technology_use_time",
    "CALC": "alcohol_intake_freq",
    "MTRANS": "transportation_mode",
    "NObeyesdad": "obesity_level_raw",
}

OBESITY_3LVL_MAP: Dict[str, str] = {
    "Insufficient_Weight": "Insufficient",
    "Normal_Weight": "Normal",
    "Overweight_Level_I": "Overweight",
    "Overweight_Level_II": "Overweight",
    "Obesity_Type_I": "Obese",
    "Obesity_Type_II": "Obese",
    "Obesity_Type_III": "Obese",
}

OBESITY_ORDER = ["Normal", "Overweight", "Obese"]


def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)


# =============================================================================
# Load data (UCI)
# =============================================================================
def load_uci_df(cfg: Config) -> pd.DataFrame:
    from ucimlrepo import fetch_ucirepo
    obesity = fetch_ucirepo(id=cfg.uci_id)
    X = obesity.data.features
    y = obesity.data.targets
    return pd.concat([X, y], axis=1)


def rename_and_recode(df: pd.DataFrame, cfg: Config) -> pd.DataFrame:
    df = df.copy().rename(columns=RENAME_MAP)
    df["obesity_level_3cat"] = df["obesity_level_raw"].map(OBESITY_3LVL_MAP)

    if cfg.exclude_insufficient_weight:
        df = df[df["obesity_level_3cat"].ne("Insufficient")].copy()

    df["obesity_level_3cat"] = pd.Categorical(df["obesity_level_3cat"], categories=OBESITY_ORDER, ordered=True)
    return df.reset_index(drop=True)


# =============================================================================
# 3) Manual bins for numeric features (pd.cut style)
# =============================================================================
def add_manual_bins(df: pd.DataFrame) -> pd.DataFrame:
    """Create binned versions of numeric/ordinal-numeric features using pd.cut."""
    df_lab = df.copy()

    # Example provided:
    df_lab["age_bin"] = pd.cut(
        df_lab["age"].astype(float),
        bins=[0, 30, 45, 60, 100],
        labels=["Young", "Mid", "Senior", "Elder"],
        include_lowest=True,
    ).astype(str)

    # Height bins (meters)
    df_lab["height_bin"] = pd.cut(
        df_lab["height_m"].astype(float),
        bins=[1.0, 1.5, 1.65, 1.8, 2.2],
        labels=["Short", "Average", "Tall", "VeryTall"],
        include_lowest=True,
    ).astype(str)

    # Weight bins (kg)
    df_lab["weight_bin"] = pd.cut(
        df_lab["weight_kg"].astype(float),
        bins=[0, 60, 80, 100, 140, 250],
        labels=["Light", "Mid", "Heavy", "VeryHeavy", "Extreme"],
        include_lowest=True,
    ).astype(str)

    # Ordinal frequency-type numeric variables → Low/Medium/High bins
    def low_med_high(x: pd.Series, cuts: List[float], labels: List[str], outcol: str) -> None:
        df_lab[outcol] = pd.cut(x.astype(float), bins=cuts, labels=labels, include_lowest=True).astype(str)

    low_med_high(df_lab["vegetable_intake_freq"], [0, 1.5, 2.5, 10], ["Low", "Medium", "High"], "vegetable_intake_bin")
    low_med_high(df_lab["num_main_meals"], [0, 2.5, 3.5, 10], ["Low", "Medium", "High"], "num_main_meals_bin")
    low_med_high(df_lab["water_intake_freq"], [0, 1.5, 2.5, 10], ["Low", "Medium", "High"], "water_intake_bin")
    low_med_high(df_lab["physical_activity_freq"], [-0.1, 0.5, 2.0, 10], ["Low", "Medium", "High"], "physical_activity_bin")
    low_med_high(df_lab["technology_use_time"], [-0.1, 0.5, 1.5, 10], ["Low", "Medium", "High"], "technology_use_bin")

    return df_lab


def build_bn_dataset(df_binned: pd.DataFrame) -> pd.DataFrame:
    """Select BN-friendly columns and cast everything to string categories."""
    keep_cols = [
        "gender",
        "family_history_overweight",
        "high_calorie_food_freq",
        "snacking_between_meals",
        "smoker",
        "calorie_monitoring",
        "alcohol_intake_freq",
        "transportation_mode",
        "age_bin",
        "height_bin",
        "weight_bin",
        "vegetable_intake_bin",
        "num_main_meals_bin",
        "water_intake_bin",
        "physical_activity_bin",
        "technology_use_bin",
        "obesity_level_3cat",
    ]
    cols = [c for c in keep_cols if c in df_binned.columns]
    df_bn = df_binned[cols].copy()
    for c in df_bn.columns:
        df_bn[c] = df_bn[c].astype(str)
    return df_bn

In [None]:
# ------------------------------------------------------------------
# 4A) pgmpy imports
# ------------------------------------------------------------------
from pgmpy.estimators import HillClimbSearch, BIC, ExpertKnowledge, BayesianEstimator, PC
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.inference import VariableElimination
import pandas as pd
import os
import networkx as nx
import matplotlib.pyplot as plt

# =============================================================================
# pgmpy: structure learning + parameter learning + inference
# =============================================================================

def fit_pgmpy_bn(df_bn: pd.DataFrame, outdir: str):
    # Define expert knowledge
    expert = ExpertKnowledge(forbidden_edges=[], required_edges=[])

    # 1) Score-based: Hill Climb + BIC, Structure Learning
    hc = HillClimbSearch(df_bn)
    best_model = hc.estimate(scoring_method=BIC(df_bn), expert_knowledge=expert, max_indegree=5)

    # Initialize the BN and ensure all nodes are present
    model_hc = DiscreteBayesianNetwork(best_model.edges())
    model_hc.add_nodes_from(df_bn.columns)

    # 2) Parameter Learning (Fit CPDs)
    model_hc.fit(df_bn, estimator=BayesianEstimator, prior_type='BDeu')

    # Save structure
    edges = list(model_hc.edges())
    pd.DataFrame(edges, columns=["from", "to"]).to_csv(os.path.join(outdir, "bn_structure_edges.csv"), index=False)

    # 3) Inference
    infer = VariableElimination(model_hc)
    q0 = infer.query(variables=["obesity_level_3cat"])

    with open(os.path.join(outdir, "example_queries.txt"), "w", encoding="utf-8") as f:
        f.write("Baseline P(obesity_level_3cat):\n")
        f.write(str(q0))
        f.write("\n\nTip: evidence must match the string labels in cleaned_dataset_bn.csv\n")

    # 4) Constraint-based: PC algorithm for comparison
    pc_est = PC(df_bn)
    best_pc = pc_est.estimate(expert_knowledge=expert)
    model_pc = DiscreteBayesianNetwork()
    model_pc.add_nodes_from(df_bn.columns)
    for u, v in best_pc.edges():
        try:
            model_pc.add_edge(u, v)
        except ValueError:
            continue

    return edges

def plot_bn(model, title):
    plt.figure(figsize=(12, 8))
    pos = nx.circular_layout(model)
    nx.draw(model, pos, with_labels=True, node_color='lightblue',
            node_size=3000, font_size=10, arrowsize=20, edge_color='gray')
    plt.title(title)
    plt.show()

In [None]:
# ------------------------------------------------------------------
# 4B) PyMC imports
# ------------------------------------------------------------------
import pandas as pd
import pymc as pm
import arviz as az
import numpy as np
import matplotlib.pyplot as plt

from IPython import get_ipython
ip = get_ipython()

import os
import json
import warnings

from dataclasses import dataclass
from typing import Dict, List, Tuple

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, roc_auc_score

# =============================================================================
# PyMC BN: CPT-based posterior for a subset of nodes
# =============================================================================
def build_parent_map(edges: List[Tuple[str, str]], nodes: List[str]) -> Dict[str, List[str]]:
    parents = {n: [] for n in nodes}
    for u, v in edges:
        if u in parents and v in parents:
            parents[v].append(u)
    return parents


def encode_categories(df_bn: pd.DataFrame, nodes: List[str]) -> Tuple[pd.DataFrame, Dict[str, List[str]]]:
    df_enc = pd.DataFrame(index=df_bn.index)
    categories: Dict[str, List[str]] = {}
    for n in nodes:
        cats = pd.Index(sorted(df_bn[n].astype(str).unique()))
        categories[n] = cats.tolist()
        df_enc[n] = pd.Categorical(df_bn[n].astype(str), categories=cats).codes
    return df_enc, categories


def parent_state_index(row: np.ndarray, parent_cols: List[int], parent_card: List[int]) -> int:
    idx = 0
    mult = 1
    for col, card in zip(reversed(parent_cols), reversed(parent_card)):
        idx += int(row[col]) * mult
        mult *= card
    return idx


def fit_pymc_bn(df_bn: pd.DataFrame, edges: List[Tuple[str, str]], cfg: Config, outdir: str) -> None:
    nodes = list(cfg.pymc_nodes)
    edges_sub = [(u, v) for (u, v) in edges if u in nodes and v in nodes]
    parents = build_parent_map(edges_sub, nodes)

    df_enc, categories = encode_categories(df_bn, nodes)
    card = {n: len(categories[n]) for n in nodes}

    node_index = {n: i for i, n in enumerate(nodes)}
    data_matrix = df_enc[nodes].values

    parent_cfg_idx = {}
    parent_cfg_size = {}

    for n in nodes:
        ps = parents[n]
        if not ps:
            parent_cfg_size[n] = 1
            parent_cfg_idx[n] = np.zeros(len(df_enc), dtype=int)
        else:
            cols = [node_index[p] for p in ps]
            cards = [card[p] for p in ps]
            parent_cfg_size[n] = int(np.prod(cards))
            idxs = np.zeros(len(df_enc), dtype=int)
            for i in range(len(df_enc)):
                idxs[i] = parent_state_index(data_matrix[i], cols, cards)
            parent_cfg_idx[n] = idxs

    with pm.Model() as model:
        cpt = {}
        for n in nodes:
            k = card[n]
            m = parent_cfg_size[n]
            alpha = np.ones((m, k), dtype=float)
            cpt[n] = pm.Dirichlet(f"cpt_{n}", a=alpha, shape=(m, k))

        for n in nodes:
            obs = df_enc[n].values
            idx = parent_cfg_idx[n]
            p = cpt[n][idx]
            pm.Categorical(f"obs_{n}", p=p, observed=obs)

        idata = pm.sample(
            draws=cfg.draws,
            tune=cfg.tune,
            chains=cfg.chains,
            target_accept=cfg.target_accept,
            random_seed=cfg.random_state,
            progressbar=True,
        )

    rows = []
    for n in nodes:
        arr = idata.posterior[f"cpt_{n}"].mean(dim=("chain", "draw")).values
        for m in range(arr.shape[0]):
            for k in range(arr.shape[1]):
                rows.append({
                    "node": n,
                    "parent_cfg_index": m,
                    "state_label": categories[n][k],
                    "posterior_mean_prob": float(arr[m, k]),
                })
    pd.DataFrame(rows).to_csv(os.path.join(outdir, "pymc_cpt_posterior_summary.csv"), index=False)

    meta = {
        "nodes": nodes,
        "edges": edges_sub,
        "categories": categories,
        "parents": parents,
        "cardinalities": card,
        "parent_cfg_size": parent_cfg_size,
        "note": "parent_cfg_index uses mixed-radix ordering over parents in parents[node] order.",
    }
    with open(os.path.join(outdir, "pymc_metadata.json"), "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2)


def main() -> None:
    cfg = Config()

    ensure_dir(cfg.output_dir)
    ensure_dir(os.path.join(cfg.output_dir, "pgmpy"))
    ensure_dir(os.path.join(cfg.output_dir, "pymc"))

    raw = load_uci_df(cfg)
    df = rename_and_recode(raw, cfg)

    df_binned = add_manual_bins(df)
    df_bn = build_bn_dataset(df_binned)

    df_bn.to_csv(os.path.join(cfg.output_dir, "cleaned_dataset_bn.csv"), index=False)

    edges = fit_pgmpy_bn(df_bn, outdir=os.path.join(cfg.output_dir, "pgmpy"))
    fit_pymc_bn(df_bn, edges=edges, cfg=cfg, outdir=os.path.join(cfg.output_dir, "pymc"))

    print("Done. Outputs written to:", cfg.output_dir)


if __name__ == "__main__":
    main()
