# 02 â€“ Feature Engineering

Blend Fitbit wearables, Chengdu PM2.5 air quality, and Delhi weather data into a unified feature table ready for modeling.


In [16]:
from __future__ import annotations

import os
from pathlib import Path

import numpy as np
import pandas as pd

from federated_health_risk.utils.config import init_env

# initialize env (loads .env if present)
init_env()

# repository-local data paths (Windows-safe)
repo_root = Path(r"C:\Users\Sahal Saeed\Documents\7semester\mlops\project_cursor")
data_dir = repo_root / "data"
data_dir.mkdir(exist_ok=True)
features_dir = data_dir / "features"
features_dir.mkdir(parents=True, exist_ok=True)

# allow overrides via environment variables set by init_env()
fitbit_path = Path(os.getenv("FITBIT_CSV", data_dir / "fitbit" / "dailyActivity_merged.csv"))
air_path = Path(os.getenv("AIR_QUALITY_CSV", data_dir / "air_quality" / "ChengduPM20100101_20151231.csv"))
weather_path = Path(os.getenv("WEATHER_CSV", data_dir / "weather" / "DailyDelhiClimateTrain.csv"))

# fail fast if inputs are missing (clear error message)
missing = [p for p in (fitbit_path, air_path, weather_path) if not p.exists()]
if missing:
    raise FileNotFoundError(f"Missing input files: {missing}")

# load inputs
fitbit_df = pd.read_csv(fitbit_path)
air_df = pd.read_csv(air_path)
weather_df = pd.read_csv(weather_path)

print(f"Loaded: fitbit={fitbit_path.name} ({len(fitbit_df)} rows), air={air_path.name} ({len(air_df)} rows), weather={weather_path.name} ({len(weather_df)} rows)")


Loaded: fitbit=dailyActivity_merged.csv (940 rows), air=ChengduPM20100101_20151231.csv (52584 rows), weather=DailyDelhiClimateTrain.csv (1462 rows)


In [17]:
fitbit_df["ActivityDate"] = pd.to_datetime(fitbit_df["ActivityDate"], errors="coerce", format="%m/%d/%Y")
fitbit_df = fitbit_df.dropna(subset=["ActivityDate"])
fitbit_df["date"] = fitbit_df["ActivityDate"].dt.floor("D")
fitbit_daily = (
    fitbit_df.groupby("date")
    .agg(
        total_steps=("TotalSteps", "mean"),
        active_minutes=("VeryActiveMinutes", "mean"),
        sedentary_minutes=("SedentaryMinutes", "mean"),
        calories=("Calories", "mean"),
        distance_km=("TotalDistance", "mean"),
    )
    .reset_index()
)

In [18]:
# --- Air quality: clean columns and build date robustly ---
# Trim column names and replace "NA" strings
air_df.columns = air_df.columns.str.strip()
air_df = air_df.replace({"NA": np.nan})

# Ensure PM columns exist and coerce to numeric
pm_cols = [c for c in ["PM_Caotangsi", "PM_Shahepu", "PM_US Post"] if c in air_df.columns]
if pm_cols:
    air_df[pm_cols] = air_df[pm_cols].apply(pd.to_numeric, errors="coerce")

# Normalize year/month/day column names (case-insensitive)
lower_map = {c.lower(): c for c in air_df.columns}
year_col = lower_map.get("year")
month_col = lower_map.get("month")
day_col = lower_map.get("day")

# Assemble date from year/month/day if present, else try any date-like column
if year_col and month_col and day_col:
    air_df[[year_col, month_col, day_col]] = air_df[[year_col, month_col, day_col]].apply(pd.to_numeric, errors="coerce")
    air_df["date"] = pd.to_datetime(dict(year=air_df[year_col].astype("Int64"), month=air_df[month_col].astype("Int64"), day=air_df[day_col].astype("Int64")), errors="coerce")
else:
    possible_date_cols = [c for c in air_df.columns if "date" in c.lower() or "time" in c.lower()]
    if possible_date_cols:
        air_df["date"] = pd.to_datetime(air_df[possible_date_cols[0]], errors="coerce")
    else:
        raise ValueError("Air dataframe does not contain year/month/day or a date-like column.")

air_df = air_df.dropna(subset=["date"])
air_df["date"] = air_df["date"].dt.floor("D")

# Aggregate daily (keep only numeric aggregations)
agg_map = {}
if "PM_US Post" in air_df.columns:
    agg_map["pm_us_post"] = ("PM_US Post", "mean")
elif pm_cols:
    agg_map["pm_us_post"] = (pm_cols[0], "mean")
if "PM_Caotangsi" in air_df.columns:
    agg_map["pm_caotangsi"] = ("PM_Caotangsi", "mean")
if "PM_Shahepu" in air_df.columns:
    agg_map["pm_shahepu"] = ("PM_Shahepu", "mean")
# optional meteorological cols if present
for col in ["DEWP", "HUMI", "PRES", "TEMP"]:
    if col in air_df.columns:
        agg_map[col.lower()] = (col, "mean")

if not agg_map:
    raise ValueError("No aggregatable columns found in air_df after parsing.")

air_daily = air_df.groupby("date").agg(**agg_map).reset_index()

print("air_daily rows:", len(air_daily))


air_daily rows: 2191


In [19]:
# --- Build per-node feature tables (NO central merge) ---
processed_dir = data_dir / "processed"
processed_dir.mkdir(parents=True, exist_ok=True)

# ---------------- Fitbit features ----------------
fitbit_daily["cardio_load"] = (
    fitbit_daily["total_steps"] /
    (fitbit_daily["active_minutes"].replace(0, np.nan) + 1)
)

fitbit_daily["steps_per_km"] = (
    fitbit_daily["total_steps"] /
    fitbit_daily["distance_km"].replace(0, np.nan)
)

fitbit_daily = fitbit_daily.replace([np.inf, -np.inf], np.nan)
fitbit_daily.to_csv(processed_dir / "fitbit_daily_features.csv", index=False)
fitbit_daily.to_parquet(features_dir / "fitbit_daily_features.parquet", index=False)

# ---------------- Air quality features ----------------
available_pm = [c for c in ["pm_us_post", "pm_caotangsi", "pm_shahepu"] if c in air_daily.columns]

air_daily["pm_mean"] = (
    air_daily[available_pm].mean(axis=1, skipna=True)
    if available_pm else np.nan
)

air_daily["pm_log"] = np.log1p(air_daily["pm_mean"].clip(lower=0))
air_daily["pm_exceed_100"] = (air_daily["pm_mean"] > 100).astype(int)

air_daily = air_daily.replace([np.inf, -np.inf], np.nan)
air_daily.to_csv(processed_dir / "air_daily_features.csv", index=False)
air_daily.to_parquet(features_dir / "air_daily_features.parquet", index=False)


# ---------------- Weather features ----------------
if "delhi_meantemp" not in weather_daily.columns and "meantemp" in weather_daily.columns:
    weather_daily = weather_daily.rename(columns={"meantemp": "delhi_meantemp"})

weather_daily["heat_index_proxy"] = (
    weather_daily.get("delhi_meantemp", 0) +
    0.1 * weather_daily.get("delhi_humidity", 0)
)

weather_daily = weather_daily.replace([np.inf, -np.inf], np.nan)
weather_daily.to_csv(processed_dir / "weather_daily_features.csv", index=False)
weather_daily.to_parquet(features_dir / "weather_daily_features.parquet", index=False)

print(f"Saved per-node feature files to: {processed_dir}")


Saved per-node feature files to: C:\Users\Sahal Saeed\Documents\7semester\mlops\project_cursor\data\processed


In [20]:
# --- Build weather_daily from weather_df (robust) ---
# try common date column names first
weather_df['date'] = pd.to_datetime(weather_df.get('date', weather_df.get('Date', None)), errors='coerce')
# if that fails, try assembling from year/month/day
if weather_df['date'].isna().all():
    for col in ['year','Year']:
        if col in weather_df.columns:
            weather_df['year'] = pd.to_numeric(weather_df[col], errors='coerce')
    for col in ['month','Month']:
        if col in weather_df.columns:
            weather_df['month'] = pd.to_numeric(weather_df[col], errors='coerce')
    for col in ['day','Day']:
        if col in weather_df.columns:
            weather_df['day'] = pd.to_numeric(weather_df[col], errors='coerce')
    if {'year','month','day'}.issubset(weather_df.columns):
        weather_df['date'] = pd.to_datetime(dict(year=weather_df['year'], month=weather_df['month'], day=weather_df['day']), errors='coerce')

# drop rows without a parsable date
weather_df = weather_df.dropna(subset=['date'])
weather_df['date'] = weather_df['date'].dt.floor('D')

# normalize common column names to those used downstream
lower_map = {c.lower(): c for c in weather_df.columns}
rename_map = {}
if 'meantemp' in lower_map:
    rename_map[lower_map['meantemp']] = 'delhi_meantemp'
if 'humidity' in lower_map:
    rename_map[lower_map['humidity']] = 'delhi_humidity'
if 'wind_speed' in lower_map:
    rename_map[lower_map['wind_speed']] = 'delhi_wind_speed'
if 'meanpressure' in lower_map:
    rename_map[lower_map['meanpressure']] = 'delhi_meanpressure'
if rename_map:
    weather_df = weather_df.rename(columns=rename_map)

# aggregate daily numeric values
weather_daily = weather_df.groupby('date').mean(numeric_only=True).reset_index()

print('weather_daily rows:', len(weather_daily))


weather_daily rows: 1462


In [21]:
# features = (
#     fitbit_daily.merge(air_daily, on="date", how="inner")
#     .merge(weather_daily, on="date", how="inner")
#     .dropna()
# )
# features["cardio_load"] = features["total_steps"] / (features["active_minutes"] + 1)
# features["pollution_load"] = features["pm_mean"] * features["delhi_meantemp"]
# features["risk_proxy"] = (
#     0.4 * features["pm_mean"]
#     + 0.2 * features["delhi_humidity"]
#     + 0.2 * features["delhi_meantemp"]
#     + 0.2 * features["cardio_load"]
# ) / 100

# features.head()


In [18]:
# output_path = features_dir / "multimodal_features.parquet"
# features.to_parquet(output_path, index=False)
# print(f"Saved {len(features)} rows to {output_path}")
