In [21]:
import pandas as pd
import plotly
from pathlib import Path
import re
import ast
import pandas as pd
import numpy as np


base_test = Path('../data/')
test_df= pd.read_csv(base_test / 'test.csv', sep=';')

def convert_timeseries_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    def try_parse_list(x):
        if isinstance(x, str) and x.strip().startswith('[') and x.strip().endswith(']'):
            try:
                return ast.literal_eval(x)
            except (ValueError, SyntaxError):
                return x
        return x

    for col in df.columns:
        df[col] = df[col].apply(try_parse_list)

    return df

test_df = convert_timeseries_columns(test_df)

def clean_ts(ts):
    if not isinstance(ts, list):
        return [np.nan]
    return [np.nan if (x is None or x < 0) else x for x in ts]
for col in test_df.columns:
    if isinstance(test_df[col].iloc[0], list):
        test_df[col] = test_df[col].apply(clean_ts)

In [22]:
def ts_features(ts):
    if ts is None:
        return [np.nan] * 7

    ts = pd.Series(ts).dropna()
    if len(ts) < 2:
        return [np.nan] * 7

    x = np.arange(len(ts))

    slope = np.polyfit(x, ts, 1)[0]

    p25, p50, p75 = np.percentile(ts, [25, 50, 75])
    iqr = p75 - p25

    rmssd = np.sqrt(np.mean(np.diff(ts) ** 2))

    mean = ts.mean()
    std = ts.std()

    return slope, p25, p50, p75, iqr, rmssd, std

# do the same for test_df
test_df['deep_sleep_pct'] = np.nan
test_df['rem_sleep_pct'] = np.nan
test_df['light_sleep_pct'] = np.nan
test_df['awake_sleep_pct'] = np.nan
test_df['sleep_efficiency'] = np.nan
test_df['sleep_day_ptc'] = np.nan

test_df['hr_slope'] = np.nan
test_df['hr_p25'] = np.nan
test_df['hr_p50'] = np.nan
test_df['hr_p75'] = np.nan
test_df['hr_iqr'] = np.nan
test_df['hr_rmssd'] = np.nan
test_df['std_hearthrate'] = np.nan
test_df['stress_slope'] = np.nan
test_df['stress_p25'] = np.nan
test_df['stress_p50'] = np.nan
test_df['stress_p75'] = np.nan
test_df['stress_iqr'] = np.nan
test_df['stress_rmssd'] = np.nan
test_df['std_stress'] = np.nan

# Resp features
test_df['resp_slope'] = np.nan
test_df['resp_p25'] = np.nan
test_df['resp_p50'] = np.nan
test_df['resp_p75'] = np.nan
test_df['resp_iqr'] = np.nan
test_df['resp_rmssd'] = np.nan
test_df['std_resp'] = np.nan

for row_idx, row in test_df.iterrows():
    sleep_deepSleepSeconds = row.get("sleep_deepSleepSeconds", np.nan)
    sleep_lightSleepSeconds = row.get("sleep_lightSleepSeconds", np.nan)
    sleep_remSleepSeconds = row.get("sleep_remSleepSeconds", np.nan)
    sleep_timeSeconds = row.get("sleep_sleepTimeSeconds", np.nan)
    sleep_awakeTimeSeconds = row.get("sleep_awakeSleepSeconds", np.nan)

    deep_pct = sleep_deepSleepSeconds / sleep_timeSeconds 
    rem_pct = sleep_remSleepSeconds / sleep_timeSeconds
    light_pct = sleep_lightSleepSeconds / sleep_timeSeconds
    awake_pct = sleep_awakeTimeSeconds / sleep_timeSeconds
    sleep_efficiency = sleep_timeSeconds / (sleep_timeSeconds + sleep_awakeTimeSeconds)
    test_df.at[row_idx, 'deep_sleep_pct'] = deep_pct
    test_df.at[row_idx, 'rem_sleep_pct'] = rem_pct
    test_df.at[row_idx, 'light_sleep_pct'] = light_pct
    test_df.at[row_idx, 'awake_sleep_pct'] = awake_pct
    test_df.at[row_idx, 'sleep_efficiency'] = sleep_efficiency
    test_df.at[row_idx, 'sleep_day_ptc'] = sleep_timeSeconds / 86400.0
    # Heart rate features
    hr_feats = ts_features(row.get("hr_time_series"))
    (
        test_df.at[row_idx, 'hr_slope'],
        test_df.at[row_idx, 'hr_p25'],
        test_df.at[row_idx, 'hr_p50'],
        test_df.at[row_idx, 'hr_p75'],
        test_df.at[row_idx, 'hr_iqr'],
        test_df.at[row_idx, 'hr_rmssd'],
        test_df.at[row_idx, 'std_hearthrate'],
    ) = hr_feats
    # Stress features
    stress_feats = ts_features(row.get("stress_time_series"))
    (
        test_df.at[row_idx, 'stress_slope'],
        test_df.at[row_idx, 'stress_p25'],
        test_df.at[row_idx, 'stress_p50'],
        test_df.at[row_idx, 'stress_p75'],
        test_df.at[row_idx, 'stress_iqr'],
        test_df.at[row_idx, 'stress_rmssd'],
        test_df.at[row_idx, 'std_stress'],
    ) = stress_feats
    # Resp features
    resp_feats = ts_features(row.get("resp_time_series"))
    (
        test_df.at[row_idx, 'resp_slope'],
        test_df.at[row_idx, 'resp_p25'],
        test_df.at[row_idx, 'resp_p50'],
        test_df.at[row_idx, 'resp_p75'],
        test_df.at[row_idx, 'resp_iqr'],
        test_df.at[row_idx, 'resp_rmssd'],
        test_df.at[row_idx, 'std_resp'],
    ) = resp_feats


In [23]:
FEATURES = [
    # sleep composition
    "deep_sleep_pct",
    "rem_sleep_pct",
    "light_sleep_pct",
    "awake_sleep_pct",
    "sleep_efficiency",

    # heart rate static
    "hr_restingHeartRate",
    "hr_lastSevenDaysAvgRestingHeartRate",
    "hr_maxHeartRate",
    "hr_minHeartRate",

    # stress static
    "str_avgStressLevel",
    "str_maxStressLevel",

    # activity
    "act_totalCalories",
    "act_activeKilocalories",
    "act_distance",

    # respiration static
    "resp_lowestRespirationValue",
    "resp_highestRespirationValue",
    "resp_avgSleepRespirationValue",

        'sleep_day_ptc'

]

feature_cols = [col for col in test_df.columns if col != "label" and 'time_series' not in col]
#se sono presenti rimuovi le colonne che contengono 'coverage'
feature_cols = [col for col in feature_cols if 'coverage' not in col]

x_list_test = test_df[feature_cols]


In [None]:
import xgboost as xgb
booster = xgb.Booster()
booster.load_model("final_model_min_max.json")
dtest=xgb.DMatrix(x_list_test)
preds = booster.predict(dtest)

#create a padatframe with column id and label 

submission_df = pd.DataFrame({
    'id': test_df['id'],
    'label': preds
})
submission_df.to_csv("cazzoni.csv", index=False)
