# Apple Health Intelligence System


## 1. Data Extraction

In [4]:
import os

print(os.listdir("."))

['health_analyzer.py', 'health_analyzer.ipynb', 'README.md', '.git']


In [1]:
import xml.etree.ElementTree as ET
import pandas as pd


def extract_health_data(xml_path='apple_health_export.xml'):
    """Parse Apple Health data directly from XML file"""
    # Define comprehensive health metrics
    HEALTH_METRICS = {
        # Activity
        "HKQuantityTypeIdentifierStepCount": "steps",
        "HKQuantityTypeIdentifierDistanceWalkingRunning": "distance",
        "HKQuantityTypeIdentifierFlightsClimbed": "flights",
        "HKQuantityTypeIdentifierActiveEnergyBurned": "active_cal",
        "HKQuantityTypeIdentifierBasalEnergyBurned": "basal_cal",
        "HKQuantityTypeIdentifierAppleExerciseTime": "exercise_min",
        "HKQuantityTypeIdentifierVO2Max": "vo2_max",
        # Vitals
        "HKQuantityTypeIdentifierHeartRate": "heart_rate",
        "HKQuantityTypeIdentifierRestingHeartRate": "resting_hr",
        "HKQuantityTypeIdentifierWalkingHeartRateAverage": "walking_hr",
        "HKQuantityTypeIdentifierBloodPressureSystolic": "bp_systolic",
        "HKQuantityTypeIdentifierBloodPressureDiastolic": "bp_diastolic",
        "HKQuantityTypeIdentifierBloodOxygenSaturation": "spo2",
        "HKQuantityTypeIdentifierBodyTemperature": "body_temp",
        "HKQuantityTypeIdentifierRespiratoryRate": "resp_rate",
        "HKQuantityTypeIdentifierHeartRateVariabilitySDNN": "hrv",
        # Body
        "HKQuantityTypeIdentifierBodyMass": "weight",
        "HKQuantityTypeIdentifierBodyFatPercentage": "body_fat",
        "HKQuantityTypeIdentifierLeanBodyMass": "lean_mass",
        "HKQuantityTypeIdentifierBodyMassIndex": "bmi",
        "HKQuantityTypeIdentifierHeight": "height",
        # Sleep
        "HKCategoryTypeIdentifierSleepAnalysis": "sleep",
        # Nutrition
        "HKQuantityTypeIdentifierDietaryEnergyConsumed": "calories_consumed",
        "HKQuantityTypeIdentifierDietaryCarbohydrates": "carbs",
        "HKQuantityTypeIdentifierDietaryProtein": "protein",
        "HKQuantityTypeIdentifierDietaryFatTotal": "fat",
        # Mindfulness
        "HKCategoryTypeIdentifierMindfulSession": "mindfulness",
        # Reproductive
        "HKCategoryTypeIdentifierOvulationTestResult": "ovulation_test",
        "HKCategoryTypeIdentifierMenstrualFlow": "menstrual_flow",
    }

    print(
        f"⏳ Parsing XML file: {xml_path} (this may take several minutes for large files)"
    )

    # Parse XML data directly
    tree = ET.parse(xml_path)
    root = tree.getroot()

    records = []
    for i, record in enumerate(root.findall("Record")):
        if i % 10000 == 0:  # Print progress periodically
            print(f"📊 Processed {i} records...")

        record_type = record.get("type")
        if record_type not in HEALTH_METRICS:
            continue

        # Get value and handle special cases
        value = record.get("value")
        if not value:
            continue

        # Calculate duration-based metrics
        if record_type == "HKCategoryTypeIdentifierSleepAnalysis":
            start = pd.to_datetime(record.get("startDate"))
            end = pd.to_datetime(record.get("endDate"))
            value = (end - start).total_seconds() / 3600  # hours
        elif record_type == "HKCategoryTypeIdentifierMindfulSession":
            start = pd.to_datetime(record.get("startDate"))
            end = pd.to_datetime(record.get("endDate"))
            value = (end - start).total_seconds() / 60  # minutes

        records.append(
            {
                "type": HEALTH_METRICS[record_type],
                "date": pd.to_datetime(record.get("startDate")).normalize(),
                "value": float(value),
                "unit": record.get("unit", ""),
                "source": record.get("sourceName", ""),
            }
        )

    print(f"✅ Parsing complete! Total records: {len(records)}")
    return pd.DataFrame(records)

In [2]:
# [Cell 2] Execute extraction
health_df = extract_health_data()
health_df.head()

⏳ Parsing XML file: apple_health_export.xml (this may take several minutes for large files)
📊 Processed 0 records...
📊 Processed 10000 records...
📊 Processed 20000 records...
📊 Processed 30000 records...
📊 Processed 40000 records...
📊 Processed 50000 records...
📊 Processed 60000 records...
📊 Processed 70000 records...
📊 Processed 80000 records...
📊 Processed 90000 records...
📊 Processed 100000 records...
📊 Processed 110000 records...
📊 Processed 120000 records...
📊 Processed 130000 records...
📊 Processed 140000 records...
📊 Processed 150000 records...
📊 Processed 160000 records...
📊 Processed 170000 records...
📊 Processed 180000 records...
📊 Processed 190000 records...
📊 Processed 200000 records...
📊 Processed 210000 records...
📊 Processed 220000 records...
📊 Processed 230000 records...
📊 Processed 240000 records...
📊 Processed 250000 records...
📊 Processed 260000 records...
📊 Processed 270000 records...
📊 Processed 280000 records...
📊 Processed 290000 records...
📊 Processed 300000 rec

Unnamed: 0,type,date,value,unit,source
0,height,2021-04-11 00:00:00+02:00,192.0,cm,Health
1,weight,2021-04-11 00:00:00+02:00,73.0,kg,JP’s iPhone
2,weight,2019-10-14 00:00:00+02:00,83.0,kg,Health
3,weight,2019-02-08 00:00:00+02:00,73.5,kg,Health
4,weight,2023-11-19 00:00:00+02:00,80.0,kg,JP’s iPhone 16 Pro


In [None]:
def process_health_data(df):
    daily = df.pivot_table(index="date", columns="type", values="value", aggfunc="sum")

    avg_cols = [
        "heart_rate",
        "resting_hr",
        "walking_hr",
        "bp_systolic",
        "bp_diastolic",
        "spo2",
        "body_temp",
        "resp_rate",
        "hrv",
        "vo2_max",
    ]
    last_cols = ["weight", "body_fat", "lean_mass", "bmi", "height"]

    imputer = KNNImputer(n_neighbors=7)
    daily_imputed = pd.DataFrame(
        imputer.fit_transform(daily), columns=daily.columns, index=daily.index
    )

    if "weight" in daily_imputed and "height" in daily_imputed:
        daily_imputed["bmi"] = (
            daily_imputed["weight"] / (daily_imputed["height"] / 100) ** 2
        )

    if "active_cal" in daily_imputed and "basal_cal" in daily_imputed:
        daily_imputed["total_cal"] = (
            daily_imputed["active_cal"] + daily_imputed["basal_cal"]
        )

    daily_imputed["health_index"] = (
        0.15 * daily_imputed["steps"].clip(upper=10000) / 10000
        + 0.15 * daily_imputed["sleep"].clip(lower=4, upper=10) / 8
        + 0.10 * (1 - (daily_imputed["resting_hr"] - 60) / 40)
        + 0.10 * daily_imputed["vo2_max"] / 60
        + 0.10 * daily_imputed["mindfulness"].clip(upper=60) / 60
        + 0.10 * daily_imputed["spo2"] / 100
        + 0.10 * (daily_imputed["weight"] / daily_imputed["weight"].quantile(0.8))
        + 0.20 * (1 - daily_imputed["hrv"].clip(upper=200) / 200)
    )

    return daily_imputed

In [None]:
def create_health_models(daily):
    """Builds forecasting, prediction, anomaly detection, and clustering models from daily health data"""
    models = {}

    # Prophet model for step forecasting
    steps_df = (
        daily[["steps"]]
        .dropna()
        .reset_index()
        .rename(columns={"date": "ds", "steps": "y"})
    )
    steps_model = Prophet(
        seasonality_mode="multiplicative",
        yearly_seasonality=True,
        weekly_seasonality=True,
    )
    steps_model.fit(steps_df)
    models["steps_model"] = steps_model

    # LSTM model for health index prediction
    if "health_index" in daily.columns:
        target = "health_index"
        features = ["steps", "sleep", "resting_hr", "spo2", "hrv"]
        data = daily[features + [target]].dropna()

        # Scale
        scaler = StandardScaler()
        scaled = scaler.fit_transform(data)

        # Create sequences
        def create_sequences(data, n_steps=14):
            X, y = [], []
            for i in range(len(data) - n_steps):
                X.append(data[i : i + n_steps, :-1])
                y.append(data[i + n_steps, -1])
            return np.array(X), np.array(y)

        X, y = create_sequences(scaled)

        model = Sequential(
            [
                LSTM(64, activation="relu", input_shape=(X.shape[1], X.shape[2])),
                Dropout(0.3),
                Dense(32, activation="relu"),
                Dense(1),
            ]
        )
        model.compile(optimizer="adam", loss="mse")
        model.fit(
            X,
            y,
            epochs=50,
            batch_size=32,
            verbose=0,
            callbacks=[EarlyStopping(patience=5)],
        )

        models["health_index_model"] = {
            "model": model,
            "scaler": scaler,
            "features": features,
        }

    # Anomaly Detection
    iso = IsolationForest(contamination=0.05, random_state=42)
    daily["anomaly"] = iso.fit_predict(daily.fillna(daily.median()))

    # PCA for clustering
    pca = PCA(n_components=3)
    pca_components = pca.fit_transform(
        StandardScaler().fit_transform(daily.fillna(daily.median()))
    )
    daily[["pca1", "pca2", "pca3"]] = pca_components

    return models, daily

In [None]:
# Run the model building process
health_models, enhanced_daily = create_health_models(daily_imputed)

In [None]:
def create_health_dashboard(daily, models):
    fig1 = go.Figure(
        go.Scatter(
            x=daily.index,
            y=daily["health_index"],
            name="Health Index",
            line=dict(color="green", width=3),
        )
    )
    fig1.update_layout(title="Health Index Over Time", template="plotly_dark")

    fig2 = px.imshow(
        daily.corr(),
        text_auto=".2f",
        title="Metric Correlation Heatmap",
        color_continuous_scale="RdBu_r",
    )

    fig3 = px.scatter_3d(
        daily,
        x="pca1",
        y="pca2",
        z="pca3",
        color="health_index",
        size="steps",
        hover_name=daily.index.strftime("%Y-%m-%d"),
        title="Health Clustering",
        color_continuous_scale="Viridis",
    )

    future = models["steps_model"].make_future_dataframe(periods=30)
    forecast = models["steps_model"].predict(future)
    fig4 = go.Figure(
        [
            go.Scatter(x=forecast["ds"], y=forecast["yhat"], name="Forecast"),
            go.Scatter(x=daily.index, y=daily["steps"], name="Actual", mode="markers"),
        ]
    )
    fig4.update_layout(title="30-Day Step Forecast", template="plotly_dark")

    dashboard = make_subplots(
        rows=2,
        cols=2,
        specs=[
            [{"type": "xy"}, {"type": "heatmap"}],
            [{"type": "scatter3d"}, {"type": "xy"}],
        ],
        subplot_titles=[
            "Health Index",
            "Correlations",
            "PCA Clusters",
            "Step Forecast",
        ],
    )

    dashboard.add_trace(fig1.data[0], row=1, col=1)
    dashboard.add_trace(fig2.data[0], row=1, col=2)
    dashboard.add_trace(fig3.data[0], row=2, col=1)
    for trace in fig4.data:
        dashboard.add_trace(trace, row=2, col=2)

    dashboard.update_layout(
        height=900, title="📈 Health Intelligence Dashboard", showlegend=False
    )
    return dashboard

In [None]:
dashboard = create_health_dashboard(enhanced_daily, health_models)
dashboard.show()

In [None]:
enhanced_daily.to_csv("health_analytics.csv")
dashboard.write_html("health_dashboard.html")