In [36]:
#bootstrapping the environment - repo root, .env, settings

import sys
from pathlib import Path
from dotenv import load_dotenv
import os

# 1) Find repo root (folder that contains .env or .git)
def find_repo_root(start: Path | None = None) -> Path:
    p = (start or Path.cwd()).resolve()
    for _ in range(25):
        if (p / ".env").exists() or (p / ".git").exists() or (p / "pyproject.toml").exists():
            return p
        if p.parent == p:
            break
        p = p.parent
    return (start or Path.cwd()).resolve()

root_dir = find_repo_root()
print("Repo root:", root_dir)

# 2) Ensure imports work
if str(root_dir) not in sys.path:
    sys.path.append(str(root_dir))
    print("Added to PYTHONPATH:", root_dir)

# 3) Load .env FIRST
load_dotenv(root_dir / ".env")

# 4) Sanity checks
assert os.getenv("HOPSWORKS_API_KEY"), "Missing HOPSWORKS_API_KEY in .env"
assert os.getenv("DATA_PATH"), \
    "Missing data path in .env (set DATA_PATH=... recommended)"

print("Loaded .env successfully")

# 5) Now import settings (after .env is loaded)
from mlfs.mcphases.config import settings
print("DATA_PATH resolved to:", settings.DATA_PATH)

Repo root: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project
Loaded .env successfully
DATA_PATH resolved to: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data/mcphases/raw


In [31]:
import re
import os
import hopsworks
import hashlib
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd

# setting the pandas display option os that the printed dataframe heads are readable
pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 200)

In [12]:
# resolving the data dir/folder, and checking if the csv files exist
DATA_DIR = Path(settings.DATA_PATH)
print("DATA_DIR:", DATA_DIR)
assert DATA_DIR.exists(), f"DATA_DIR does not exist: {DATA_DIR}"

paths = {
    "hormones_and_selfreport": DATA_DIR / "hormones_and_selfreport.csv",
    "sleep_score": DATA_DIR / "sleep_score.csv",
    "stress_score": DATA_DIR / "stress_score.csv",
    "resting_heart_rate": DATA_DIR / "resting_heart_rate.csv",
    "computed_temperature": DATA_DIR / "computed_temperature.csv",
    "sleep": DATA_DIR / "sleep.csv",
    "height_and_weight": DATA_DIR / "height_and_weight.csv",
    "subject_info": DATA_DIR / "subject-info.csv",
}

for k, p in paths.items():
    print(f"{k:<22} -> {p.name} exists={p.exists()}")

DATA_DIR: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data/mcphases/raw
hormones_and_selfreport -> hormones_and_selfreport.csv exists=True
sleep_score            -> sleep_score.csv exists=True
stress_score           -> stress_score.csv exists=True
resting_heart_rate     -> resting_heart_rate.csv exists=True
computed_temperature   -> computed_temperature.csv exists=True
sleep                  -> sleep.csv exists=True
wrist_temperature      -> wrist_temperature.csv exists=True
height_and_weight      -> height_and_weight.csv exists=True
subject_info           -> subject-info.csv exists=True


In [13]:
# helper functions for col and key standardization
# to make all the raw tables consistent so that i can merge them safely

# consistent col name - lowercase + underscore
def _sanitize_col(name: str) -> str:
    s = name.strip().lower()
    s = re.sub(r"[^\w]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    if not s:
        s = "col"
    if re.match(r"^\d", s):
        s = "c_" + s
    return s

# col names consistent for every col, and making dupe col names unique
def sanitize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    cols = [_sanitize_col(c) for c in df.columns]

    seen = {}
    out = []
    for c in cols:
        if c not in seen:
            seen[c] = 1
            out.append(c)
        else:
            seen[c] += 1
            out.append(f"{c}_{seen[c]}")
    df.columns = out
    return df

# ensuring subject_id exists
def standardize_subject(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    # subject_id is already present in your outputs, but keep this robust
    if "subject_id" not in df.columns:
        for c in df.columns:
            if c in ["subject", "participant_id", "participant", "id"]:
                df = df.rename(columns={c: "subject_id"})
                break
    return df

# ensures a day_in_study exists and is numeric
def ensure_day_in_study(df: pd.DataFrame, *, prefer_end_day: bool = False) -> pd.DataFrame:
    """
    Ensure we end up with 'day_in_study' as Int64.
    Some tables have 'sleep_end_day_in_study'/'sleep_start_day_in_study'.
    For sleep-ish tables, you usually want the END day.
    """
    df = df.copy()

    if "day_in_study" not in df.columns:
        if prefer_end_day and "sleep_end_day_in_study" in df.columns:
            df["day_in_study"] = df["sleep_end_day_in_study"]
        elif "sleep_end_day_in_study" in df.columns:
            df["day_in_study"] = df["sleep_end_day_in_study"]
        elif "sleep_start_day_in_study" in df.columns:
            df["day_in_study"] = df["sleep_start_day_in_study"]

    if "day_in_study" in df.columns:
        df["day_in_study"] = pd.to_numeric(df["day_in_study"], errors="coerce").astype("Int64")

    return df

#converts timestamp cols to datetime - currently not necessary
def parse_timestamps(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for c in df.columns:
        if "timestamp" in c or c.endswith("_time"):
            df[c] = pd.to_datetime(df[c], errors="coerce")
    return df

# declaring the primary keys for one row per person per day
KEYS = ["subject_id", "day_in_study"]


In [56]:
# collapsing each raw table (input = df) to "one row per subject/person per day"
# removing rows missing keys and exact dupe rows

def collapse_to_daily(df: pd.DataFrame, prefix: str) -> pd.DataFrame:
    df = df.copy()
    df = df.dropna(subset=KEYS).drop_duplicates()  # drop exact duplicates

    # Drop very “granular” timestamp columns from aggregation output (we don’t need them in daily master)
    drop_cols = [c for c in df.columns if "timestamp" in c or c.endswith("_time")]
    keep_cols = [c for c in df.columns if c not in drop_cols]

    df = df[keep_cols]

    # splitting cols into numeric (aggregated by mean) vs non-numeric (first non-null value)
    num_cols = [c for c in df.columns if c not in KEYS and pd.api.types.is_numeric_dtype(df[c])]
    other_cols = [c for c in df.columns if c not in KEYS and c not in num_cols]

    agg = {c: "mean" for c in num_cols}
    # first non-null for others
    for c in other_cols:
        agg[c] = lambda s: s.dropna().iloc[0] if s.dropna().shape[0] else np.nan

    #grouping by keys
    out = df.groupby(KEYS, as_index=False).agg(agg)

    # prefix all non-key cols to avoid collisions
    rename = {c: f"{prefix}__{c}" for c in out.columns if c not in KEYS}
    out = out.rename(columns=rename)
    return out

# prints no.of dupes in the raw file (input a df)
def check_duplicates(df: pd.DataFrame, name: str):
    d = df.duplicated(subset=KEYS).sum()
    print(f"{name:<22} duplicates on keys:", d, "rows:", len(df))


In [60]:
# loads the base table, as df, cleaning it using helper funcs
base_path = paths["hormones_and_selfreport"]
assert base_path.exists(), f"Missing file: {base_path}"

# reads a path/filename/url into a dataframe
#base = df
base = pd.read_csv(base_path)
base = sanitize_columns(base)
base = standardize_subject(base)
base = ensure_day_in_study(base)

check_duplicates(base, "hormones_and_selfreport")

# Base is your master grain; keep one row per subject/day
base = base.dropna(subset=KEYS).drop_duplicates(subset=KEYS)
base = base.sort_values(KEYS).reset_index(drop=True)

print("BASE shape:", base.shape)
display(base.head(3))

hormones_and_selfreport duplicates on keys: 0 rows: 5659
BASE shape: (5659, 22)


Unnamed: 0,subject_id,study_interval,is_weekend,day_in_study,phase,lh,estrogen,pdg,flow_volume,flow_color,appetite,exerciselevel,headaches,cramps,sorebreasts,fatigue,sleepissue,moodswing,stress,foodcravings,indigestion,bloating
0,1,2022,True,1,Follicular,2.9,94.2,,Not at all,Not at all,Low,Low,High,Very Low/Little,Very Low/Little,High,Low,Very Low/Little,Moderate,Very Low/Little,Very Low/Little,Very Low/Little
1,1,2022,False,2,Follicular,1.2,226.3,,Not at all,Not at all,Low,Low,Very High,Very Low/Little,Very Low/Little,High,Very High,Very Low/Little,Moderate,Very Low/Little,Very Low/Little,Very Low/Little
2,1,2022,False,3,Follicular,3.5,276.8,,Not at all,Not at all,Very Low,Very Low,High,Very Low/Little,Very Low/Little,Very High,Very High,Very Low/Little,Low,Very Low/Little,Very Low/Little,Very Low/Little


In [61]:
# loading, and collapsing the wearables/daily tables to one row per day per person

def load_and_collapse(name: str, *, prefer_end_day: bool = False) -> pd.DataFrame:
    p = paths[name]
    assert p.exists(), f"Missing file: {p}"
    df = pd.read_csv(p)
    df = sanitize_columns(df)
    df = standardize_subject(df)
    df = ensure_day_in_study(df, prefer_end_day=prefer_end_day)
    check_duplicates(df, name)
    return collapse_to_daily(df, prefix=name) #collapsing per day per person using the defed func

# all dfs!!!!
sleep_score_d = load_and_collapse("sleep_score")
stress_score_d = load_and_collapse("stress_score")
rhr_d = load_and_collapse("resting_heart_rate")
computed_temp_d = load_and_collapse("computed_temperature", prefer_end_day=True)
sleep_d = load_and_collapse("sleep", prefer_end_day=True)

print("Collapsed shapes:")
print("sleep_score_d:", sleep_score_d.shape)
print("stress_score_d:", stress_score_d.shape)
print("rhr_d:", rhr_d.shape)
print("computed_temp_d:", computed_temp_d.shape)
print("sleep_d:", sleep_d.shape)

sleep_score            duplicates on keys: 230 rows: 5308
stress_score           duplicates on keys: 3693 rows: 7932
resting_heart_rate     duplicates on keys: 8078 rows: 13737
computed_temperature   duplicates on keys: 435 rows: 5575
sleep                  duplicates on keys: 9363 rows: 14765
Collapsed shapes:
sleep_score_d: (5078, 11)
stress_score_d: (4239, 13)
rhr_d: (5659, 6)
computed_temp_d: (5140, 13)
sleep_d: (5402, 17)


In [18]:
#loading the static tables - not per day, only "one row per person"

def load_subject_table(name: str) -> pd.DataFrame:
    p = paths[name]
    assert p.exists(), f"Missing file: {p}"
    df = pd.read_csv(p)
    df = sanitize_columns(df)
    df = standardize_subject(df)
    df = df.dropna(subset=["subject_id"]).drop_duplicates(subset=["subject_id"])
    # prefix non-key cols
    rename = {c: f"{name}__{c}" for c in df.columns if c != "subject_id"}
    return df.rename(columns=rename)

#static dfs
height_weight_s = load_subject_table("height_and_weight")
subject_info_s = load_subject_table("subject_info")

print("height_weight_s:", height_weight_s.shape)
print("subject_info_s:", subject_info_s.shape)

height_weight_s: (42, 5)
subject_info_s: (42, 8)


In [26]:
# merging everything into a master daily table, anchroed to the base table

master = base.copy()

#left joins every daily/dynamic table
def left_merge_daily(master: pd.DataFrame, daily_df: pd.DataFrame) -> pd.DataFrame:
    return master.merge(daily_df, on=KEYS, how="left")

#left joins the static ones
def left_merge_subject(master: pd.DataFrame, subj_df: pd.DataFrame) -> pd.DataFrame:
    return master.merge(subj_df, on=["subject_id"], how="left")

for dname, ddf in [
    ("sleep_score", sleep_score_d),
    ("stress_score", stress_score_d),
    ("resting_heart_rate", rhr_d),
    ("computed_temperature", computed_temp_d),
    ("sleep", sleep_d),
]:
    before = master.shape
    master = left_merge_daily(master, ddf)
    print(f"Merged {dname:<22} {before} -> {master.shape}")

for sname, sdf in [
    ("height_and_weight", height_weight_s),
    ("subject_info", subject_info_s),
]:
    before = master.shape
    master = left_merge_subject(master, sdf)
    print(f"Merged {sname:<22} {before} -> {master.shape}")

# final dedupe and sort
master = master.dropna(subset=KEYS).drop_duplicates(subset=KEYS)
master = master.sort_values(KEYS).reset_index(drop=True)

print("MASTER FINAL:", master.shape)
display(master.head(3))

Merged sleep_score            (5659, 22) -> (5659, 31)
Merged stress_score           (5659, 31) -> (5659, 42)
Merged resting_heart_rate     (5659, 42) -> (5659, 46)
Merged computed_temperature   (5659, 46) -> (5659, 57)
Merged sleep                  (5659, 57) -> (5659, 72)
Merged height_and_weight      (5659, 72) -> (5659, 76)
Merged subject_info           (5659, 76) -> (5659, 83)
MASTER FINAL: (5659, 83)


Unnamed: 0,subject_id,study_interval,is_weekend,day_in_study,phase,lh,estrogen,pdg,flow_volume,flow_color,appetite,exerciselevel,headaches,cramps,sorebreasts,fatigue,sleepissue,moodswing,stress,foodcravings,indigestion,bloating,sleep_score__study_interval,sleep_score__is_weekend,sleep_score__overall_score,sleep_score__composition_score,sleep_score__revitalization_score,sleep_score__duration_score,sleep_score__deep_sleep_in_minutes,sleep_score__resting_heart_rate,sleep_score__restlessness,stress_score__study_interval,stress_score__is_weekend,stress_score__stress_score,stress_score__sleep_points,stress_score__max_sleep_points,stress_score__responsiveness_points,stress_score__max_responsiveness_points,stress_score__exertion_points,stress_score__max_exertion_points,stress_score__calculation_failed,stress_score__status,resting_heart_rate__study_interval,resting_heart_rate__is_weekend,resting_heart_rate__value,resting_heart_rate__error,computed_temperature__study_interval,computed_temperature__is_weekend,computed_temperature__sleep_start_day_in_study,computed_temperature__sleep_end_day_in_study,computed_temperature__temperature_samples,computed_temperature__nightly_temperature,computed_temperature__baseline_relative_sample_sum,computed_temperature__baseline_relative_sample_sum_of_squares,computed_temperature__baseline_relative_nightly_standard_deviation,computed_temperature__baseline_relative_sample_standard_deviation,computed_temperature__type,sleep__study_interval,sleep__is_weekend,sleep__sleep_start_day_in_study,sleep__sleep_end_day_in_study,sleep__duration,sleep__minutestofallasleep,sleep__minutesasleep,sleep__minutesawake,sleep__minutesafterwakeup,sleep__timeinbed,sleep__efficiency,sleep__infocode,sleep__mainsleep,sleep__type,sleep__levels,height_and_weight__height_2022,height_and_weight__weight_2022,height_and_weight__height_2024,height_and_weight__weight_2024,subject_info__birth_year,subject_info__gender,subject_info__ethnicity,subject_info__education,subject_info__sexually_active,subject_info__self_report_menstrual_health_literacy,subject_info__age_of_first_menarche
0,1,2022,True,1,Follicular,2.9,94.2,,Not at all,Not at all,Low,Low,High,Very Low/Little,Very Low/Little,High,Low,Very Low/Little,Moderate,Very Low/Little,Very Low/Little,Very Low/Little,,,,,,,,,,,,,,,,,,,,,2022.0,1.0,74.785346,100.0,2022.0,1.0,1.0,1.0,414.0,34.616087,,,,,SKIN,2022.0,1.0,1.0,1.0,37020000.0,0.0,596.0,21.0,0.0,617.0,97.0,1.0,1.0,classic,"{'summary': {'restless': {'count': 8, 'minutes...",,,,,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14
1,1,2022,False,2,Follicular,1.2,226.3,,Not at all,Not at all,Low,Low,Very High,Very Low/Little,Very Low/Little,High,Very High,Very Low/Little,Moderate,Very Low/Little,Very Low/Little,Very Low/Little,,,,,,,,,,,,,,,,,,,,,2022.0,0.0,80.407307,29.833838,2022.0,0.0,1.0,2.0,258.0,33.780659,,,,,SKIN,2022.0,0.5,1.5,2.0,15510000.0,0.0,254.0,4.5,0.0,258.5,97.0,1.5,0.5,classic,"{'summary': {'restless': {'count': 4, 'minutes...",,,,,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14
2,1,2022,False,3,Follicular,3.5,276.8,,Not at all,Not at all,Very Low,Very Low,High,Very Low/Little,Very Low/Little,Very High,Very High,Very Low/Little,Low,Very Low/Little,Very Low/Little,Very Low/Little,,,,,,,,,,,,,,,,,,,,,2022.0,0.0,84.686869,24.267298,2022.0,0.0,3.0,3.0,353.0,34.634929,6.651304,1554.843599,0.487865,2.101622,SKIN,2022.0,0.0,3.0,3.0,31800000.0,0.0,502.0,28.0,0.0,530.0,95.0,1.0,1.0,classic,"{'summary': {'restless': {'count': 14, 'minute...",,,,,1999,Woman,White,"Some university/ post-secondary, no degree",Yes,,14


In [64]:
master.columns

Index(['subject_id', 'study_interval', 'is_weekend', 'day_in_study', 'phase', 'lh', 'estrogen', 'pdg', 'flow_volume', 'flow_color', 'appetite', 'exerciselevel', 'headaches', 'cramps', 'sorebreasts',
       'fatigue', 'sleepissue', 'moodswing', 'stress', 'foodcravings', 'indigestion', 'bloating', 'sleep_score__study_interval', 'sleep_score__is_weekend', 'sleep_score__overall_score',
       'sleep_score__composition_score', 'sleep_score__revitalization_score', 'sleep_score__duration_score', 'sleep_score__deep_sleep_in_minutes', 'sleep_score__resting_heart_rate',
       'sleep_score__restlessness', 'stress_score__study_interval', 'stress_score__is_weekend', 'stress_score__stress_score', 'stress_score__sleep_points', 'stress_score__max_sleep_points',
       'stress_score__responsiveness_points', 'stress_score__max_responsiveness_points', 'stress_score__exertion_points', 'stress_score__max_exertion_points', 'stress_score__calculation_failed',
       'stress_score__status', 'resting_heart_ra

In [21]:
# Drop JSON/blob columns that break ML pipelines
blob_cols = []
for c in master.columns:
    if c.endswith("__levels") or c.endswith("__levels_1") or c.endswith("__levels_2"):
        blob_cols.append(c)
    # also drop anything that looks like dict/json stored as a string
    if master[c].dtype == "object":
        sample = master[c].dropna().astype(str).head(20).tolist()
        if any(s.strip().startswith("{") and s.strip().endswith("}") for s in sample):
            blob_cols.append(c)

blob_cols = sorted(set(blob_cols))
print("Dropping blob columns:", blob_cols)
master = master.drop(columns=blob_cols, errors="ignore")

Dropping blob columns: ['sleep__levels']


In [22]:
redundant = [c for c in master.columns if c.endswith("__study_interval") or c.endswith("__is_weekend")]
print("Dropping redundant interval/weekend columns:", redundant[:20], "… total:", len(redundant))
master = master.drop(columns=redundant, errors="ignore")

Dropping redundant interval/weekend columns: ['sleep_score__study_interval', 'sleep_score__is_weekend', 'stress_score__study_interval', 'stress_score__is_weekend', 'resting_heart_rate__study_interval', 'resting_heart_rate__is_weekend', 'computed_temperature__study_interval', 'computed_temperature__is_weekend', 'sleep__study_interval', 'sleep__is_weekend'] … total: 10


In [23]:
# Heuristic: if sleep duration median is huge (> 10000), treat as milliseconds
if "sleep__duration" in master.columns:
    med = pd.to_numeric(master["sleep__duration"], errors="coerce").median()
    print("Median sleep__duration:", med)

    if pd.notna(med) and med > 10_000:  # very likely ms
        master["sleep_duration_minutes"] = pd.to_numeric(master["sleep__duration"], errors="coerce") / 1000 / 60
        master["sleep_duration_hours"] = master["sleep_duration_minutes"] / 60
        print("Created sleep_duration_minutes + sleep_duration_hours (converted from ms).")
    else:
        # assume minutes already
        master["sleep_duration_minutes"] = pd.to_numeric(master["sleep__duration"], errors="coerce")
        master["sleep_duration_hours"] = master["sleep_duration_minutes"] / 60

Median sleep__duration: 26500000.0
Created sleep_duration_minutes + sleep_duration_hours (converted from ms).


In [24]:
#enforcing consistent dtypes
# keys
master["subject_id"] = pd.to_numeric(master["subject_id"], errors="coerce").astype("Int64")
master["day_in_study"] = pd.to_numeric(master["day_in_study"], errors="coerce").astype("Int64")

# boolean
if "is_weekend" in master.columns:
    # sometimes comes as True/False, sometimes 0/1
    master["is_weekend"] = master["is_weekend"].astype("boolean")

# optional: clean up obvious numeric columns stored as object
for c in master.columns:
    if master[c].dtype == "object":
        # try numeric coercion only if it looks numeric-ish
        s = master[c].dropna().astype(str).head(50)
        if len(s) and sum(x.replace(".","",1).isdigit() for x in s) / len(s) > 0.8:
            master[c] = pd.to_numeric(master[c], errors="coerce")

In [25]:
# final sanity checks and saving parquet
KEYS = ["subject_id", "day_in_study"]

dups = master.duplicated(subset=KEYS).sum()
print("Duplicates on keys:", dups)

print("Subjects:", master["subject_id"].nunique())
print("Day range:", master["day_in_study"].min(), "->", master["day_in_study"].max())

missing = master.isna().mean().sort_values(ascending=False)
display(missing.head(20))

OUT_DIR = Path(root_dir) / "data_cache"
OUT_DIR.mkdir(exist_ok=True)

out_path = OUT_DIR / "mcphases_master_daily.parquet"
master.to_parquet(out_path, index=False)
print("Saved:", out_path)

Duplicates on keys: 0
Subjects: 42
Day range: 1 -> 1004


height_and_weight__height_2024    0.728221
pdg                               0.670613
height_and_weight__weight_2024    0.623785
height_and_weight__height_2022    0.482594
flow_volume                       0.436473
flow_color                        0.435589
height_and_weight__weight_2022    0.416505
moodswing                         0.413324
indigestion                       0.412440
foodcravings                      0.412087
stress                            0.412087
sorebreasts                       0.412087
cramps                            0.412087
headaches                         0.411910
bloating                          0.411910
sleepissue                        0.411734
exerciselevel                     0.411557
appetite                          0.411557
fatigue                           0.411380
sleep_score__composition_score    0.369853
dtype: float64

Saved: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data_cache/mcphases_master_daily.parquet


In [28]:
#optional to see the "usable inputs" immediately
KEYS = ["subject_id", "day_in_study"]

FEATURES_MODE_A = [
    "phase",
    "is_weekend",
    "sleep_score__overall_score",
    "sleep_duration_minutes",
    "stress_score__stress_score",
    "resting_heart_rate__value",
    "computed_temperature__nightly_temperature",  # optional
    # optional 5-sec self-reports:
    "cramps", "headaches", "sleepissue", "stress", "flow_volume",
]

# keep only columns that actually exist
FEATURES_MODE_A = [c for c in FEATURES_MODE_A if c in master.columns]

preview = master[KEYS + FEATURES_MODE_A].copy()
display(preview.head(5))
print("Mode A (only wearables) columns:", len(KEYS + FEATURES_MODE_A))


Unnamed: 0,subject_id,day_in_study,phase,is_weekend,sleep_score__overall_score,stress_score__stress_score,resting_heart_rate__value,computed_temperature__nightly_temperature,cramps,headaches,sleepissue,stress,flow_volume
0,1,1,Follicular,True,,,74.785346,34.616087,Very Low/Little,High,Low,Moderate,Not at all
1,1,2,Follicular,False,,,80.407307,33.780659,Very Low/Little,Very High,Very High,Moderate,Not at all
2,1,3,Follicular,False,,,84.686869,34.634929,Very Low/Little,High,Very High,Low,Not at all
3,1,4,Fertility,False,80.0,,83.852219,34.050056,Very Low/Little,Very Low/Little,Very High,Low,Not at all
4,1,5,Fertility,False,,,0.0,,Very Low/Little,Very Low/Little,High,Low,Not at all


Mode A (only wearables) columns: 13


In [None]:
# Hopsworks saving

In [34]:
# prepping the master dataframe so hopsworks accepts it - 
# - takes it, creates hs_df, converts obj cols to strings, renames every col to follow hopsworks rules,
# saves a json file mapping the og col names to hopsworks col names

HOPS_MAX_LEN = 63
KEYS = ["subject_id", "day_in_study"]

def _hops_safe_name(name: str) -> str:
    s = name.strip().lower()
    s = re.sub(r"[^a-z0-9_]+", "_", s)       # only a-z 0-9 _
    s = re.sub(r"_+", "_", s).strip("_")     # collapse underscores
    if not s:
        s = "f"
    if not re.match(r"^[a-z]", s):
        s = "f_" + s
    return s

def make_hopsworks_safe_columns(df, keys=KEYS, max_len=HOPS_MAX_LEN):
    rename_map = {}
    used = set()

    # keep keys unchanged (they already satisfy rules)
    for c in df.columns:
        if c in keys:
            rename_map[c] = c
            used.add(c)

    # rename the rest
    for c in df.columns:
        if c in keys:
            continue

        base = _hops_safe_name(c)

        if len(base) > max_len:
            h = hashlib.md5(base.encode("utf-8")).hexdigest()[:12]
            base = base[: max_len - 13] + "_" + h   # 1 underscore + 12 hash = 13

        # ensure uniqueness
        candidate = base
        i = 2
        while candidate in used:
            suffix = f"_{i}"
            if len(base) + len(suffix) > max_len:
                candidate = base[: max_len - len(suffix)] + suffix
            else:
                candidate = base + suffix
            i += 1

        rename_map[c] = candidate
        used.add(candidate)

    new_df = df.rename(columns=rename_map)
    return new_df, rename_map

# Apply to hs_df (copy of master)
hs_df = master.copy()

# Cast object columns to string (safe)
for c in hs_df.columns:
    if hs_df[c].dtype == "object":
        hs_df[c] = hs_df[c].astype("string")

# Make names Hopsworks safe
hs_df, name_map = make_hopsworks_safe_columns(hs_df)

# Save mapping so you can interpret features later
OUT_DIR = Path(root_dir) / "data_cache"
OUT_DIR.mkdir(exist_ok=True)
map_path = OUT_DIR / "hopsworks_feature_name_map.json"
map_path.write_text(json.dumps(name_map, indent=2), encoding="utf-8")

# Quick check
too_long = [c for c in hs_df.columns if len(c) > HOPS_MAX_LEN]
print("Saved name map to:", map_path)
print("Columns still too long:", too_long[:10], "count:", len(too_long))
print("Example rename:", "computed_temperature__baseline_relative_nightly_standard_deviation",
      "->", name_map.get("computed_temperature__baseline_relative_nightly_standard_deviation"))

Saved name map to: /Users/sreenijaveladri/Downloads/llm_project_starter/scalable-ml-project/data_cache/hopsworks_feature_name_map.json
Columns still too long: [] count: 0
Example rename: computed_temperature__baseline_relative_nightly_standard_deviation -> computed_temperature_baseline_relative_nightly_sta_b2caea867e0f


In [35]:
# hopsworks login, creates feature grp metadata/skeleton in hopsworks, and inserts/backfills the actual data rows in the fg

project = hopsworks.login(engine="python")
fs = project.get_feature_store()

FG_NAME = "mcphases_daily_fg"
FG_VERSION = 1
KEYS = ["subject_id", "day_in_study"]

# Ensure keys are ints
hs_df["subject_id"] = pd.to_numeric(hs_df["subject_id"], errors="coerce").astype("Int64")
hs_df["day_in_study"] = pd.to_numeric(hs_df["day_in_study"], errors="coerce").astype("Int64")

fg = fs.get_or_create_feature_group(
    name=FG_NAME,
    version=FG_VERSION,
    primary_key=KEYS,
    description="mcPHASES master daily table keyed by (subject_id, day_in_study).",
    online_enabled=False,
)

fg.insert(hs_df, write_options={"wait_for_job": True})
print(f"Inserted {len(hs_df)} rows into Feature Group: {FG_NAME} v{FG_VERSION}")

2026-01-03 18:12:41,810 INFO: Closing external client and cleaning up certificates.
2026-01-03 18:12:41,819 INFO: Connection closed.
2026-01-03 18:12:41,821 INFO: Initializing external client
2026-01-03 18:12:41,821 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-03 18:12:42,677 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/3208
2026-01-03 18:13:07,868 INFO: Computing insert statistics
Inserted 5659 rows into Feature Group: mcphases_daily_fg v1
