In [1]:
import os
import pandas as pd
from tqdm import tqdm
from prophet import Prophet
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

import warnings
warnings.filterwarnings("ignore")

  from .autonotebook import tqdm as notebook_tqdm
ERROR:prophet.plot:Importing plotly failed. Interactive plots will not work.


In [2]:


class ActigraphyDataProcessor:
    def __init__(self, root_dir, feature_table_path):
        self.root_dir = root_dir
        self.feature_table_path = feature_table_path
        self.features_to_forecast = ['enmo', 'light', 'battery_voltage']

    def load_data(self, participant_id):
        file_path = os.path.join(self.root_dir, f"id={participant_id}", "part-0.parquet")
        data = pd.read_parquet(file_path)
        data['id'] = participant_id
        print(f"Loaded data columns for {participant_id}: {data.columns.tolist()}")
        return data

    def compute_daily_summary(self, data):
        daily_summary = data.groupby(['id', 'relative_date_PCIAT']).agg({
            'X': ['mean', 'max', 'min', 'std'],
            'Y': ['mean', 'max', 'min', 'std'],
            'Z': ['mean', 'max', 'min', 'std'],
            'enmo': ['mean', 'max', 'std'],
            'anglez': 'mean',
            'non-wear_flag': 'sum',
            'light': ['mean', 'max', 'min'],
            'battery_voltage': 'mean'
        }).reset_index()
        daily_summary.columns = ['_'.join(col).strip() if col[1] else col[0] for col in daily_summary.columns.values]
        print(f"Daily summary columns after aggregation: {daily_summary.columns.tolist()}")
        return daily_summary

    def add_temporal_features(self, daily_summary):
        for col in ['enmo_mean', 'light_mean', 'battery_voltage_mean']:
            if col in daily_summary.columns:
                daily_summary[f'{col}_lag1'] = daily_summary.groupby('id')[col].shift(1)
                daily_summary[f'{col}_rolling3'] = daily_summary.groupby('id')[col].rolling(window=3).mean().reset_index(0, drop=True)
            else:
                print(f"Warning: Column {col} not found in daily_summary.")
        return daily_summary

    def forecast_features(self, data, participant_id):
        date_mapping = data[['time_of_day', 'relative_date_PCIAT']].drop_duplicates()
        date_mapping['ds'] = pd.to_datetime(date_mapping['time_of_day'], unit='s', errors='coerce')
        
        predictions = []
        for feature in self.features_to_forecast:
            df = data[['time_of_day', feature]].dropna().rename(columns={'time_of_day': 'ds', feature: 'y'})
            max_time = df['ds'].max()
            if max_time > 10**12:
                df['ds'] = pd.to_datetime(df['ds'] / 10**3, unit='s', errors='coerce')
            else:
                df['ds'] = pd.to_datetime(df['ds'], unit='s', errors='coerce')
            df = df.dropna(subset=['ds'])

            model = Prophet(daily_seasonality=True, weekly_seasonality=True)
            model.fit(df)
            future = model.make_future_dataframe(periods=7, freq='D')
            forecast = model.predict(future)

            forecast['id'] = participant_id
            forecast = forecast[['id', 'ds', 'yhat']].rename(columns={'yhat': f'{feature}_forecast'})
            predictions.append(forecast)

        predictions_df = pd.concat(predictions, axis=1)
        predictions_df = predictions_df.loc[:, ~predictions_df.columns.duplicated()]
        predictions_df = pd.merge(predictions_df, date_mapping[['ds', 'relative_date_PCIAT']], on='ds', how='left')
        
        return predictions_df

    def process_participant_data(self, participant_id):
        data = self.load_data(participant_id)
        daily_summary = self.compute_daily_summary(data)
        print(f"Daily summary columns for {participant_id}: {daily_summary.columns.tolist()}")
        daily_summary = self.add_temporal_features(daily_summary)
        forecasts = self.forecast_features(data, participant_id)
        print(f"Forecast columns for {participant_id}: {forecasts.columns.tolist()}")
        processed_data = pd.merge(daily_summary, forecasts, on=['id', 'relative_date_PCIAT'], how='left')
        return processed_data

    def process_all_participants(self):
        all_data = []
        with ThreadPoolExecutor() as executor:
            futures = []
            for id_folder in os.listdir(self.root_dir):
                if not id_folder.startswith('id='):
                    continue
                participant_id = id_folder.split('=')[-1]
                futures.append(executor.submit(self.process_participant_data, participant_id))

            for future in tqdm(as_completed(futures), total=len(futures), desc="Processing participants"):
                participant_data = future.result()
                all_data.append(participant_data)

        feature_table = pd.concat(all_data, ignore_index=True)
        return feature_table

    def save_feature_table(self):
        os.makedirs(os.path.dirname(self.feature_table_path), exist_ok=True)
        feature_table = self.process_all_participants()
        feature_table.to_parquet(self.feature_table_path, index=False)
        print(f"Feature table saved at {self.feature_table_path}")


In [3]:
# Initialize the processor with root directory and path for feature table
processor = ActigraphyDataProcessor(root_dir="../../data/series_test.parquet/", feature_table_path="../../data/processed/feature_table.parquet")

# Process all participants and save the feature table, with progress shown
processor.save_feature_table()


Processing participants:   0%|          | 0/2 [00:00<?, ?it/s]

Loaded data columns for 00115b9f: ['step', 'X', 'Y', 'Z', 'enmo', 'anglez', 'non-wear_flag', 'light', 'battery_voltage', 'time_of_day', 'weekday', 'quarter', 'relative_date_PCIAT', 'id']
Daily summary columns after aggregation: ['id', 'relative_date_PCIAT', 'X_mean', 'X_max', 'X_min', 'X_std', 'Y_mean', 'Y_max', 'Y_min', 'Y_std', 'Z_mean', 'Z_max', 'Z_min', 'Z_std', 'enmo_mean', 'enmo_max', 'enmo_std', 'anglez_mean', 'non-wear_flag_sum', 'light_mean', 'light_max', 'light_min', 'battery_voltage_mean']
Daily summary columns for 00115b9f: ['id', 'relative_date_PCIAT', 'X_mean', 'X_max', 'X_min', 'X_std', 'Y_mean', 'Y_max', 'Y_min', 'Y_std', 'Z_mean', 'Z_max', 'Z_min', 'Z_std', 'enmo_mean', 'enmo_max', 'enmo_std', 'anglez_mean', 'non-wear_flag_sum', 'light_mean', 'light_max', 'light_min', 'battery_voltage_mean']
Loaded data columns for 001f3379: ['step', 'X', 'Y', 'Z', 'enmo', 'anglez', 'non-wear_flag', 'light', 'battery_voltage', 'time_of_day', 'weekday', 'quarter', 'relative_date_PCIAT',

DEBUG:cmdstanpy:input tempfile: /var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/bfdx7iiy.json


Daily summary columns after aggregation: ['id', 'relative_date_PCIAT', 'X_mean', 'X_max', 'X_min', 'X_std', 'Y_mean', 'Y_max', 'Y_min', 'Y_std', 'Z_mean', 'Z_max', 'Z_min', 'Z_std', 'enmo_mean', 'enmo_max', 'enmo_std', 'anglez_mean', 'non-wear_flag_sum', 'light_mean', 'light_max', 'light_min', 'battery_voltage_mean']
Daily summary columns for 001f3379: ['id', 'relative_date_PCIAT', 'X_mean', 'X_max', 'X_min', 'X_std', 'Y_mean', 'Y_max', 'Y_min', 'Y_std', 'Z_mean', 'Z_max', 'Z_min', 'Z_std', 'enmo_mean', 'enmo_max', 'enmo_std', 'anglez_mean', 'non-wear_flag_sum', 'light_mean', 'light_max', 'light_min', 'battery_voltage_mean']


DEBUG:cmdstanpy:input tempfile: /var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/4etptwli.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/Users/javedhassansabu/marvelous-databricks-course-javedhassans/.venv/lib/python3.11/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=48886', 'data', 'file=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/bfdx7iiy.json', 'init=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/4etptwli.json', 'output', 'file=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/prophet_modelw0b8s_kd/prophet_model-20241031183602.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
18:36:03 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
DEBUG:cmdstanpy:input tempfile: /var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/lsxma79t.json
18:36:03 - cmdstanpy - INFO - Chain [1] done processing
INFO

Forecast columns for 00115b9f: ['id', 'ds', 'enmo_forecast', 'light_forecast', 'battery_voltage_forecast', 'relative_date_PCIAT']


18:36:12 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing
DEBUG:cmdstanpy:input tempfile: /var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/u6msrxse.json
DEBUG:cmdstanpy:input tempfile: /var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/2m7eseej.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/Users/javedhassansabu/marvelous-databricks-course-javedhassans/.venv/lib/python3.11/site-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=97379', 'data', 'file=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/u6msrxse.json', 'init=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/2m7eseej.json', 'output', 'file=/var/folders/3l/9_ksltyj6259016m01k0xyqc0000gp/T/tmp8sjigi01/prophet_model9bhfphhz/prophet_model-20241031183615.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
18:36:15 - cmdstanpy - INFO - Chain [1] start processing
INFO:

Forecast columns for 001f3379: ['id', 'ds', 'enmo_forecast', 'light_forecast', 'battery_voltage_forecast', 'relative_date_PCIAT']
Feature table saved at ../../data/processed/feature_table.parquet





In [4]:
df_feature = pd.read_parquet("../../data/processed/feature_table.parquet")

In [12]:
df_feature.id.value_counts()

id
001f3379    47
00115b9f    40
Name: count, dtype: int64

In [14]:
df_feature.columns

Index(['id', 'relative_date_PCIAT', 'X_mean', 'X_max', 'X_min', 'X_std',
       'Y_mean', 'Y_max', 'Y_min', 'Y_std', 'Z_mean', 'Z_max', 'Z_min',
       'Z_std', 'enmo_mean', 'enmo_max', 'enmo_std', 'anglez_mean',
       'non-wear_flag_sum', 'light_mean', 'light_max', 'light_min',
       'battery_voltage_mean', 'enmo_mean_lag1', 'enmo_mean_rolling3',
       'light_mean_lag1', 'light_mean_rolling3', 'battery_voltage_mean_lag1',
       'battery_voltage_mean_rolling3', 'ds', 'enmo_forecast',
       'light_forecast', 'battery_voltage_forecast'],
      dtype='object')

In [20]:
df_acti_00115b9f = pd.read_parquet("../../data/series_test.parquet/id=00115b9f/part-0.parquet")
df_acti_00115b9f

Unnamed: 0,step,X,Y,Z,enmo,anglez,non-wear_flag,light,battery_voltage,time_of_day,weekday,quarter,relative_date_PCIAT
0,0,0.021536,0.022214,-1.022370,0.022853,-88.280762,0.0,53.000000,4188.000000,56940000000000,4,3,41.0
1,1,0.022005,0.022187,-1.019740,0.020231,-88.241707,0.0,51.666668,4188.166504,56945000000000,4,3,41.0
2,2,0.022240,0.022005,-1.019401,0.019893,-88.170067,0.0,50.333332,4188.333496,56950000000000,4,3,41.0
3,3,0.021589,0.022578,-1.018177,0.018667,-88.250031,0.0,50.500000,4188.500000,56955000000000,4,3,41.0
4,4,0.022005,0.023763,-1.014323,0.016848,-88.130775,0.0,33.166668,4181.000000,57235000000000,4,3,41.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
43325,43325,-0.008333,-0.023620,-1.006901,0.007224,-88.595741,0.0,0.000000,3824.000000,57580000000000,6,3,85.0
43326,43326,0.048730,-0.076725,-0.953776,0.056839,-85.352219,0.0,0.000000,3824.000000,57640000000000,6,3,85.0
43327,43327,0.387370,0.793151,-0.402214,0.069961,-24.097908,0.0,0.000000,3824.000000,57645000000000,6,3,85.0
43328,43328,0.801953,0.501589,-0.040937,0.045489,-2.113776,0.0,0.000000,3824.000000,57650000000000,6,3,85.0


In [18]:
df_feature[df_feature.id == "00115b9f"]

Unnamed: 0,id,relative_date_PCIAT,X_mean,X_max,X_min,X_std,Y_mean,Y_max,Y_min,Y_std,...,enmo_mean_lag1,enmo_mean_rolling3,light_mean_lag1,light_mean_rolling3,battery_voltage_mean_lag1,battery_voltage_mean_rolling3,ds,enmo_forecast,light_forecast,battery_voltage_forecast
0,00115b9f,41.0,-0.388967,0.99099,-1.035964,0.414956,-0.187072,1.016823,-1.54599,0.504869,...,,,,,,,NaT,,,
1,00115b9f,42.0,-0.530206,0.97862,-1.746094,0.369632,0.025359,1.00263,-2.905339,0.496983,...,0.09069,,33.471268,,4174.144531,,NaT,,,
2,00115b9f,43.0,-0.34415,0.794297,-0.959557,0.393514,-0.13025,1.020573,-1.029193,0.404084,...,0.065511,0.068653,40.340485,31.803,4155.183105,4156.394531,NaT,,,
3,00115b9f,44.0,-0.510063,0.976335,-1.044531,0.413122,0.076826,1.001562,-1.215625,0.459795,...,0.049758,0.050629,21.597248,60.081188,4139.855957,4140.654622,NaT,,,
4,00115b9f,45.0,0.0145,0.01543,0.013594,0.000745,-0.075251,-0.068203,-0.081315,0.006348,...,0.036618,0.031521,118.305832,47.284031,4126.924805,4126.784017,NaT,,,
5,00115b9f,46.0,0.023186,1.507865,-1.138203,0.522909,0.076296,0.885156,-1.064375,0.331025,...,0.008188,0.047509,1.949013,44.493234,4113.571289,4115.227865,NaT,,,
6,00115b9f,47.0,-0.440468,0.985729,-0.97349,0.349583,-0.049812,1.666354,-1.758724,0.578938,...,0.097723,0.054264,13.224856,19.113361,4105.1875,4104.084229,NaT,,,
7,00115b9f,48.0,0.047782,0.537917,-0.586016,0.152971,0.088663,0.428516,-0.04668,0.129417,...,0.056882,0.060674,42.166214,32.087346,4093.493896,4094.034505,NaT,,,
8,00115b9f,49.0,0.061023,0.072396,0.038672,0.011507,0.137763,0.233529,0.006055,0.061621,...,0.027419,0.033166,40.870968,29.015409,4083.422119,4086.275391,NaT,,,
9,00115b9f,50.0,-0.19659,1.007214,-0.971667,0.359093,0.294944,1.013776,-1.414115,0.486515,...,0.015197,0.026616,4.009044,17.989583,4081.910156,4078.350342,NaT,,,


In [9]:
df_train = pd.read_csv("../../data/childhealth.csv")

In [11]:
df_train[df_train['id'] == "001f3379"]

Unnamed: 0,id,Basic_Demos-Enroll_Season,Basic_Demos-Age,Basic_Demos-Sex,CGAS-Season,CGAS-CGAS_Score,Physical-Season,Physical-BMI,Physical-Height,Physical-Weight,...,PCIAT-PCIAT_18,PCIAT-PCIAT_19,PCIAT-PCIAT_20,PCIAT-PCIAT_Total,SDS-Season,SDS-SDS_Total_Raw,SDS-SDS_Total_T,PreInt_EduHx-Season,PreInt_EduHx-computerinternet_hoursday,sii
5,001f3379,Spring,13,1,Winter,50.0,Summer,22.279952,59.5,112.2,...,1.0,2.0,1.0,34.0,Summer,40.0,56.0,Spring,0.0,1.0
