Features

In [1]:
import pytz
import os


DEFAULT_TZ = pytz.FixedOffset(540)  # GMT+09:00; Asia/Seoul

PATH_DATA = 'data/D'
PATH_ESM = os.path.join(PATH_DATA, 'EsmResponse.csv')
PATH_PARTICIPANT = os.path.join(PATH_DATA, 'UserInfo.csv')
PATH_SENSOR = os.path.join(PATH_DATA, 'Sensor')

PATH_INTERMEDIATE = os.path.join('data/intermediate')

DATA_TYPES = {
    'Acceleration': 'ACC',
    'AmbientLight': 'AML',
    'Calorie': 'CAL',
    'Distance': 'DST',
    'EDA': 'EDA',
    'HR': 'HRT',
    'RRI': 'RRI',
    'SkinTemperature': 'SKT',
    'StepCount': 'STP',
    'UltraViolet': 'ULV',
    'ActivityEvent': 'ACE',
    'ActivityTransition': 'ACT',
    'AppUsageEvent': 'APP',
    'BatteryEvent': 'BAT',
    'CallEvent': 'CAE',
    'Connectivity': 'CON',
    'DataTraffic': 'DAT',
    'InstalledApp': 'INS',
    'Location': 'LOC',
    'MediaEvent': 'MED',
    'MessageEvent': 'MSG',
    'WiFi': 'WIF',
    'ScreenEvent': 'SCR',
    'RingerModeEvent': 'RNG',
    'ChargeEvent': 'CHG',
    'PowerSaveEvent': 'PWS',
    'OnOffEvent': 'ONF'
}

In [2]:
import pandas as pd
import numpy as np
import scipy.stats as st
import cloudpickle
import ray
from datetime import datetime
from contextlib import contextmanager
import warnings
import time


def load(path: str):
    with open(path, mode='rb') as f:
        return cloudpickle.load(f)

    
def dump(obj, path: str):
    with open(path, mode='wb') as f:
        cloudpickle.dump(obj, f)
        
    
def log(msg: any):
    print('[{}] {}'.format(datetime.now().strftime('%y-%m-%d %H:%M:%S'), msg))


def summary(x):
    x = np.asarray(x)
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')

        n = len(x)
        # Here, uppercase np.dtype.kind corresponds to non-numeric data.
        # Also, we view the boolean data as dichotomous categorical data.
        if x.dtype.kind.isupper() or x.dtype.kind == 'b': 
            cnt = pd.Series(x).value_counts(dropna=False)
            card = len(cnt)
            cnt = cnt[:20]                
            cnt_str = ', '.join([f'{u}:{c}' for u, c in zip(cnt.index, cnt)])
            if card > 30:
                cnt_str = f'{cnt_str}, ...'
            return {
                'n': n,
                'cardinality': card,
                'value_count': cnt_str
            }
        else: 
            x_nan = x[np.isnan(x)]
            x_norm = x[~np.isnan(x)]
            
            tot = np.sum(x_norm)
            m = np.mean(x_norm)
            me = np.median(x_norm)
            s = np.std(x_norm, ddof=1)
            l, u = np.min(x_norm), np.max(x)
            conf_l, conf_u = st.t.interval(0.95, len(x_norm) - 1, loc=m, scale=st.sem(x_norm))
            n_nan = len(x_nan)
            
            return {
                'n': n,
                'sum': tot,
                'mean': m,
                'SD': s,
                'med': me,
                'range': (l, u),
                'conf.': (conf_l, conf_u),
                'nan_count': n_nan
            }


@contextmanager
def on_ray(*args, **kwargs):
    try:
        if ray.is_initialized():
            ray.shutdown()
        ray.init(*args, **kwargs)
        yield None
    finally:
        ray.shutdown()

In [3]:
import numpy as np
import pandas as pd
from typing import Dict, Callable, Union, Tuple, List, Optional, Iterable
from datetime import timedelta as td
from scipy import stats
import ray
import warnings
import time

In [4]:
def _safe_na_check(_v):
    _is_nan_inf = False
    
    try:
        _is_nan_inf = np.isnan(_v) or np.isinf(_v)
    except:
        _is_nan_inf = False
    
    return _is_nan_inf or _v is None

In [5]:
import neurokit2 as nk
from scipy.signal import find_peaks
from scipy.integrate import simps

def _extract_eda( d_val) :
    # Set the sampling rate of your data (in Hz, e.g., 1000 samples per second)
    sampling_rate = 2

    _feature = {}
    v =d_val
    
    if len(v) == 0:
        return {}

    # Raw EDA
    mean_eda = np.mean(v)
    max_eda = np.max(v)
    min_eda = np.min(v)
    std_eda = np.std(v)
    peaks, _ = find_peaks(v)
    num_peaks_eda = len(peaks)
    auc_eda = simps(v, dx=1 / sampling_rate)
    
    if len(v) <10:
        print(f"Warning: EDA signal is too short: {len(v)} elements")
        # Combine features into a dictionary
        all_features = {
            'mean_eda': mean_eda,
            'max_eda': max_eda,
            'min_eda': min_eda,
            'std_eda': std_eda,
            'num_peaks_eda': num_peaks_eda,
            'auc_eda': auc_eda
        }

    else:
        # Decomposing EDA
        data = nk.eda_phasic(v, sampling_rate=2)

        # Tonic EDA
        mean_tonic = np.mean(data['EDA_Tonic'])
        max_tonic = np.max(data['EDA_Tonic'])
        min_tonic = np.min(data['EDA_Tonic'])
        std_tonic = np.std(data['EDA_Tonic'])
        peaks, _ = find_peaks(data['EDA_Tonic'])
        num_peaks_tonic = len(peaks)
        auc_tonic = simps(data['EDA_Tonic'], dx=1 / sampling_rate)

        # Phasic EDA
        mean_phasic = np.mean(data['EDA_Phasic'])
        max_phasic = np.max(data['EDA_Phasic'])
        min_phasic = np.min(data['EDA_Phasic'])
        std_phasic = np.std(data['EDA_Phasic'])
        peaks, _ = find_peaks(data['EDA_Phasic'])
        num_peaks_phasic = len(peaks)
        auc_phasic = simps(data['EDA_Phasic'], dx=1 / sampling_rate)


        # Combine features into a dictionary
        all_features = {
            'mean_eda': mean_eda,
            'max_eda': max_eda,
            'min_eda': min_eda,
            'std_eda': std_eda,
            'num_peaks_eda': num_peaks_eda,
            'auc_eda': auc_eda,
            'mean_tonic': mean_tonic,
            'max_tonic': max_tonic,
            'min_tonic': min_tonic,
            'std_tonic': std_tonic,
            'num_peaks_tonic': num_peaks_tonic,
            'auc_tonic': auc_tonic,
            'auc_phasic': auc_phasic,
            'mean_phasic': mean_phasic,
            'max_phasic': max_phasic,
            'min_phasic': min_phasic,
            'std_phasic': std_phasic,
            'num_peaks_phasic': num_peaks_phasic,
            'auc_phasic': auc_phasic
        }



    for feature, value in all_features.items():
        _feature[f'EDA#{feature}']= value
    
    return _feature

In [6]:
from scipy import stats

def _extract_rri( d_val) :
    _feature = {}
    v =d_val
    
    if len(v) == 0:
        return {}
    
    # Extract features
    mean = np.mean(v)
    median = np.median(v)
    maximum = np.max(v)
    minimum = np.min(v)
    std_dev = np.sqrt(np.var(v, ddof=1)) if len(v) > 1 else 0
    kurt = stats.kurtosis(v, bias=False)
    skw = stats.skew(v, bias=False)
    # Calculate the slope of column
    slope, _ = np.polyfit(np.arange(len(v)), v, 1)
    percentile_80 = v.quantile(0.8)
    percentile_20 = v.quantile(0.2)
    
    # RMSSD
    rmssd = np.sqrt(np.mean(np.diff(v)**2))

    # Combine features into a dictionary
    all_features = {
        "mean": mean,
        "median": median,
        "maximum": maximum,
        "minimum": minimum,
        "std_dev": std_dev,
        "kurt": kurt,
        "skw": skw,
        "slope": slope,
        "percentile_80": percentile_80,
        "percentile_20": percentile_20,
        "rmssd": rmssd
    }



    for feature, value in all_features.items():
        _feature[f'HRV#{feature}']= value
    
    return _feature

In [7]:
def _extract_numeric_feature(d_key, d_val) -> Dict:
    feature = {}
    v=d_val
    hist, _ = np.histogram(v, bins='doane', density=False)
    std = np.sqrt(np.var(v, ddof=1)) if len(v) > 1 else 0
    v_norm = (v - np.mean(v)) / std if std != 0 else np.zeros(len(v))
    feature[f'{d_key}#AVG'] = np.mean(v) # Sample mean
    feature[f'{d_key}#STD'] = std # Sample standard deviation
    if std !=0:
        feature[f'{d_key}#SKW'] = stats.skew(v, bias=False) # Sample skewness
        feature[f'{d_key}#KUR'] = stats.kurtosis(v, bias=False) # Sample kurtosis
    else:
        feature[f'{d_key}#SKW'] = -3 # Sample skewness
        feature[f'{d_key}#KUR'] = -3 # Sample kurtosis
    feature[f'{d_key}#ASC'] = np.sum(np.abs(np.diff(v))) # Abstract sum of changes
    feature[f'{d_key}#BEP'] = stats.entropy(hist) # Binned entropy
    feature[f'{d_key}#MED'] = np.median(v) # Median
    feature[f'{d_key}#TSC'] = np.sqrt(np.sum(np.power(np.diff(v_norm), 2))) # Timeseries complexity
    return feature

In [8]:
def _extract_categorical_feature(cats, d_key, d_val) -> Dict:
    feature = {}
    v = d_val
    cnt = v.value_counts()
    val, sup = cnt.index, cnt.values
    hist = {k: v for k, v in zip(val, sup)}

    # Information Entropy
    feature[f'{d_key}#ETP#'] = stats.entropy(sup / len(v))
    # Abs. Sum of Changes
    feature[f'{d_key}#ASC#'] = np.sum(v.values[1:] != v.values[:-1])
    if len(cats) == 2: # Dichotomous categorical data
        c = cats[0]
        feature[f'{d_key}#RLV_SUP'] = hist[c] / len(v) if c in hist else 0
    else:
        for c in cats:
            feature[f'{d_key}#RLV_SUP={c}'] = hist[c] / len(v)  if c in hist else 0
            
    return feature

In [9]:
def _extract_timeWindow_feature(is_numeric, cats, d_key, d_val) -> Dict:
    feature = {}
    v = d_val
    if d_key in ['CAE_DUR']:
        feature = _extract_numeric_feature(d_key, v)
        feature['CAE#FREQ'] = len(v)
    elif d_key in ['LOC_CLS']:
        feature = _extract_categorical_feature(cats, d_key, v)
        feature['LOC#NumOfPlcVist'] = len(set(v))
    elif d_key in ['EDA']:
        feature = _extract_eda(v)
    elif d_key in ['RRI']:
        feature = _extract_rri(v)
    else:
        if is_numeric:
            feature = _extract_numeric_feature(d_key, v)
        else:
            feature =_extract_categorical_feature(cats, d_key, v)
    return feature

In [10]:
#This fucntion is based on the  towards circadian computing: "early to bed and early to rise"
#makes some of us unhealthy and sleep derived
theta=30
def calculate_sleep_duration(s_on, s_off, theta):
    # Merge s_on and s_off into a single DataFrame based on timestamp
    df = pd.merge(pd.DataFrame({'timestamp': s_on, 'event': 'screen_on'}),
                  pd.DataFrame({'timestamp': s_off, 'event': 'screen_off'}),
                  how='outer', on='timestamp')
    # fill missing values in event_x with values from event_y, and vice versa
    df['event_x'] = df['event_x'].fillna(df['event_y'])
    df['event_y'] = df['event_y'].fillna(df['event_x'])
    # drop the event_x and event_y columns
    df = df.drop(columns=['event_y']).rename(columns={'event_x': 'event'})
    # Fill in missing timestamps with NaT and sort by timestamp
    df = df.fillna(pd.NaT).sort_values('timestamp')
    df=df.assign(
         timestamp=lambda x: pd.to_datetime(x['timestamp'], unit='ms', utc=True).dt.tz_convert(DEFAULT_TZ)
     )
    # Filter out screen-on events caused by notifications
    mask = (df['event'] == 'screen_off') & ((df['timestamp'].diff().fillna(pd.NaT)  / pd.Timedelta(seconds=1)) > theta)
    filtered_df = df[mask].reset_index(drop=True)
    # Discard non-usage patterns that do not start between 9PM to 7AM (next day)
    sleep_duration = pd.Series(dtype=float)
    sleep_onset = pd.Series(dtype="datetime64[ns]")
    for i in range(len(filtered_df)-1):
        if filtered_df.loc[i, 'timestamp'].hour >= 21 or filtered_df.loc[i, 'timestamp'].hour < 7:
            non_usage_duration = filtered_df.loc[i+1, 'timestamp'] - filtered_df.loc[i, 'timestamp']
            if non_usage_duration.total_seconds() > 0:
                sleep_duration = pd.concat([sleep_duration, pd.Series(non_usage_duration.total_seconds())])
                sleep_onset = pd.concat([sleep_onset , pd.Series(filtered_df.loc[i, 'timestamp'])])
    # Calculate sleep midpoint and apply individual corrective term
    if len(sleep_duration) > 0:
        sleep_duration = sleep_duration.reset_index(drop=True)
        sleep_onset  =sleep_onset.reset_index(drop=True)
        sleep_midpoint = sleep_onset + pd.to_timedelta(sleep_duration/2, unit="s")
        return sleep_duration.max(), sleep_onset.iloc[sleep_duration.idxmax()], sleep_midpoint.iloc[sleep_duration.idxmax()]
    else:
        return None, None, None

In [11]:
epoch_names = {
    0: 'Dawn',
    1: 'Morning',
    2: 'Afternoon',
    3: 'LateAfternoon',
    4: 'Evening',
    5: 'Night'
}
def _extract(
        pid: str,
        data: Dict[str, pd.Series],
        label: pd.Series,
        label_values: List[str],
#        window_data: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
#        window_label: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
        categories: Dict[str, Optional[List[any]]] = None,
        constant_features: Dict[str, any] = None,
        resample_s: Dict[str, float] = None
) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    _s = time.time()
    log(f"Begin feature extraction on {pid}'s data.")
    categories = categories or dict()
    constant_features = constant_features or dict()
    resample_s = resample_s or dict()
    X, y, date_times = [], [], []
#    count = 0
    for timestamp in label.index:
        row = dict()
        #Find the start of today and yesterday for extracting today epoch features and yesterday epoch features
        start_of_today = datetime(timestamp.year, timestamp.month, timestamp.day, tzinfo=timestamp.tzinfo)
        start_of_today = pd.Timestamp(start_of_today.date(), tz=DEFAULT_TZ)
        start_of_yesterday = timestamp - pd.Timedelta(days=1)
        start_of_yesterday = pd.Timestamp(start_of_yesterday.date(), tz=DEFAULT_TZ)
        label_cur = label.at[timestamp]
        yesterday_time_windows = [
                (start_of_yesterday + pd.Timedelta(hours=6), start_of_yesterday + pd.Timedelta(hours=9)),
                (start_of_yesterday + pd.Timedelta(hours=9), start_of_yesterday + pd.Timedelta(hours=12)),
                (start_of_yesterday + pd.Timedelta(hours=12), start_of_yesterday + pd.Timedelta(hours=15)),
                (start_of_yesterday + pd.Timedelta(hours=15), start_of_yesterday + pd.Timedelta(hours=18)),
                (start_of_yesterday + pd.Timedelta(hours=18), start_of_yesterday + pd.Timedelta(hours=21)),
                (start_of_yesterday + pd.Timedelta(hours=21), start_of_yesterday + pd.Timedelta(hours=24))
            ]
        today_time_windows = []
        for i in range(6):
            start = start_of_today + pd.Timedelta(hours=i*3)
            end = start_of_today + pd.Timedelta(hours=(i+1)*3)
            if start <= timestamp:
                today_time_windows.append((start, min(end, timestamp)))
            else:
                break
        t = timestamp - td(milliseconds=1)
        # Features relevant to participants' info
        for d_key, d_val in constant_features.items():
            row[d_key] = d_val
        # Features from sensor data
        for d_key, d_val in data.items():
            is_numeric = d_key not in categories
            cats = categories.get(d_key) or list()
            d_val = d_val.sort_index()
            # Features relevant to latest value of a given data
            # These features are extracted only for bounded categorical data and numerical data.
            if is_numeric or cats:
                try:
                    v = d_val.loc[:t].iloc[-1]
                except (KeyError, IndexError):
                    v = 0
                if is_numeric:
                    row[f'{d_key}#VAL'] = v
                else:
                    for c in cats:
                        row[f'{d_key}#VAL={c}'] = v == c
            # Features relevant to duration since the latest state change.
            # These features are only for categorical data.
            # In addition, duration since a given state is set recently is considered,
            # that are available only at bounded categorical data.
            if not is_numeric:
                try:
                    v = d_val.loc[:t]
                    row[f'{d_key}#DSC'] = (t - v.index[-1]).total_seconds() if len(v) else -1.0
                    for c in cats:
                        v_sub = v.loc[lambda x: x == c].index
                        row[f'{d_key}#DSC={c}'] = (t - v_sub[-1]).total_seconds() if len(v_sub) else -1.0
                except (KeyError, IndexError):
                    row[f'{d_key}#DSC'] = -1.0
                    for c in cats:
                        row[f'{d_key}#DSC={c}'] = -1.0
            # Features extracted from time-windows
            # These features requires resampling and imputation on each data.
            sample_rate = RESAMPLE_S.get(d_key) or 1
            d_val_res = d_val.resample(f'{sample_rate}S', origin='start')
            if is_numeric:
                try:
                    # Your resampling code here...
                    d_val_res = d_val_res.mean().interpolate(method='time').dropna()
                except ValueError:
                    # Save input data to a file or external storage for debugging...
                    print(d_val_res)
                    print(d_val)
                    raise
            else:
                d_val_res = d_val_res.ffill().dropna()
            #No resampling
#             d_val_res =d_val
           # Features extracted from immediate past time-windows
            w_val = 15 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}_ImmediatePast': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)
            #Features extracted from yesterday epoch time windows
            for count, (start, end) in enumerate(yesterday_time_windows):
                # Get data for the current yesterday epoch time window
                try:
                    v = d_val_res.loc[start:end]
                except (KeyError, IndexError):
                    continue
                epoch_name = epoch_names.get(count)
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    new_row = {f'{k}Yesterday{epoch_name}': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                    row.update(new_row)
                    
            #Features extracted from today epoch time windows until current time
            for count, (start, end) in enumerate(today_time_windows):
                # Get data for the current time window
                try:
                    v = d_val_res.loc[start:end]
                except (KeyError, IndexError):
                    continue
                epoch_name = epoch_names.get(count)
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    new_row = {f'{k}Today{epoch_name}': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                    row.update(new_row)
        #Sleep feature extracted from last night's data
        onset_min = start_of_yesterday + pd.Timedelta(hours=21)
        onset_max = start_of_today + pd.Timedelta(hours=14)
        s_on =data['SCR_EVENT'].loc[data['SCR_EVENT']=='ON']
        s_off =data['SCR_EVENT'].loc[data['SCR_EVENT']=='OFF']
        duration, onset, midpoint =calculate_sleep_duration(s_on.loc[onset_min:onset_max].reset_index()['timestamp'], s_off.loc[onset_min:onset_max].reset_index()['timestamp'], theta)
        if duration:
            row['Sleep#Duration'] = duration
            onset_hour = onset.hour
            if onset_hour >=21:
                row['Sleep#Onset'] = onset_hour - 21
            else:
                row['Sleep#Onset'] = onset_hour + 3
        else:
            row['Sleep#Duration'] = -1
            row['Sleep#Onset'] = -1
            
        # Features relevant to time
        day_of_week = ['MON', 'TUE', 'WED', 'THU', 'FRI', 'SAT', 'SUN'][t.isoweekday() - 1]
        is_weekend = 'Y' if t.isoweekday() > 5 else 'N'
        hour = t.hour
        if 6 <= hour < 9:
            hour_name = 'DAWN'
        elif 9 <= hour < 12:
            hour_name = 'MORNING'
        elif 12 <= hour < 15:
            hour_name = 'AFTERNOON'
        elif 15 <= hour < 18:
            hour_name = 'LATE_AFTERNOON'
        elif 18 <= hour < 21:
            hour_name = 'EVENING'
        elif 21 <= hour < 24:
            hour_name = 'NIGHT'
        else:
            hour_name = 'MIDNIGHT'
        for d in ['MON', 'TUE', 'WED', 'THU', 'FRI', 'SAT', 'SUN']:
            row[f'Time#DOW={d}'] = d == day_of_week
        for d in ['Y', 'N']:
            row[f'Time#WKD={d}'] = d == is_weekend
        for d in ['DAWN', 'MORNING', 'AFTERNOON', 'LATE_AFTERNOON', 'EVENING', 'NIGHT', 'MIDNIGHT']:
            row[f'Time#HRN={d}'] = d == hour_name
        try:
            last_label = label.loc[label[:t].index.max()]
        except (KeyError, IndexError):
            last_label = -1
        row[f'ESM#LastLabel'] = last_label
        # Label values extracted from yesterday epochs
        for count, (start, end) in enumerate(yesterday_time_windows):
            try:
                v = label.loc[start:end]
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2: # Binary classification
                    row[f'ESM#LIK#Yesterday{epoch_name}'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK={l}#Yesterday{epoch_name}'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
            except (KeyError, IndexError):
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2:
                    row[f'ESM#LIK#Yesterday{epoch_name}'] = 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK={l}#Yesterday{epoch_name}'] = 0
        # Label values extracted from yesterday epochs
        for count, (start, end) in enumerate(today_time_windows):
            try:
                v = label.loc[start:end]
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2: # Binary classification
                    row[f'ESM#LIK#Today{epoch_name}'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK={l}#Today{epoch_name}'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
            except (KeyError, IndexError):
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2:
                    row[f'ESM#LIK#Today{epoch_name}'] = 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK={l}#Today{epoch_name}'] = 0           
        # Label values extracted from immediate past
        w_val = 15 * 60
        try:
            v = label.loc[t - td(seconds=w_val):t]
            epoch_name = epoch_names.get(count)
            if len(label_values) <= 2: # Binary classification
                row[f'ESM#LIK#ImmediatePast'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
            else:
                for l in label_values:
                    row[f'ESM#LIK={l}#ImmediatePast'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
        except (KeyError, IndexError):
            epoch_name = epoch_names.get(count)
            if len(label_values) <= 2:
                row[f'ESM#LIK#ImmediatePast'] = 0
            else:
                for l in label_values:
                    row[f'ESM#LIK={l}#ImmediatePast'] = 0  

        row = {
            k: 0.0 if _safe_na_check(v) else v
            for k, v in row.items()
        }

        X.append(row)
        y.append(label_cur)
        date_times.append(timestamp)
    
    log(f"Complete feature extraction on {pid}'s data ({time.time() - _s:.2f} s).")
    X = pd.DataFrame(X)
    y = np.asarray(y)
    group = np.repeat(pid, len(y))
    date_times =  np.asarray(date_times)
    return X, y, group, date_times
def extract(
        pids: Iterable[str],
        data: Dict[str, pd.Series],
        label: pd.Series,
        label_values: List[str],
#        window_data: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
#        window_label: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
        categories: Dict[str, Optional[List[any]]] = None,
        constat_features: Dict[str, Dict[str, any]] = None,
        resample_s: Dict[str, float] = None,
        with_ray: bool=False
):
    if with_ray and not ray.is_initialized():
        raise EnvironmentError('Ray should be initialized if "with_ray" is set as True.')
    func = ray.remote(_extract).remote if with_ray else _extract
    jobs = []
    for pid in pids:
        d = dict()
        for k, v in data.items():
            try:
                d[k] = v.loc[(pid, )]
                if k.startswith('LOC_'):
                    d[k].index= pd.to_datetime( d[k].index, unit='ms', utc=True).tz_convert(DEFAULT_TZ)
                d['SPEED'] = d.pop('LOC_SPEED')
            except (KeyError, IndexError):
                pass
        job = func(
            pid=pid, data=d, label=label.loc[(pid, )],
            label_values=label_values,
#            window_data=window_data,
#            window_label=window_label,
            categories=categories,
            constant_features=constat_features[pid],
            resample_s=resample_s
        )
        jobs.append(job)
    jobs = ray.get(jobs) if with_ray else jobs
    print([x.shape for _, x, _, _ in jobs])
    X = pd.concat([x for x, _, _, _ in jobs], axis=0, ignore_index=True)
    y = np.concatenate([x for _, x, _, _ in jobs], axis=0)
    group = np.concatenate([x for _, _, x, _ in jobs], axis=0)
    date_times = np.concatenate([x for _, _, _, x in jobs], axis=0)
    t_s = date_times.min().normalize().timestamp()
    t_norm = np.asarray(list(map(lambda x: x.timestamp() - t_s, date_times)))
    C, DTYPE = X.columns, X.dtypes
    X = X.fillna({
        **{c: False for c in C[(DTYPE == object) | (DTYPE == bool)]},
        **{c: 0.0 for c in C[(DTYPE != object) & (DTYPE != bool)]},
    }).astype({
        **{c: 'bool' for c in C[(DTYPE == object) | (DTYPE == bool)]},
        **{c: 'float32' for c in C[(DTYPE != object) & (DTYPE != bool)]},
    })
    return X, y, group, t_norm, date_times

In [12]:
import os
import cloudpickle

LABEL_VALUES = [1, 0]
RESAMPLE_S = {
    'ACC_AXX': 0.25,
    'ACC_AXY': 0.25,
    'ACC_AXZ': 0.25,
    'ACC_MAG': 0.25,
    'EDA': 0.5,
}


CATEGORIES = {
   'DST_MOT': ['IDLE', 'WALKING', 'JOGGING', 'RUNNING'],
   'ULV_INT': ['NONE', 'LOW', 'MEDIUM', 'HIGH'],
    'ACT': ['WALKING', 'STILL', 'IN_VEHICLE', 'ON_BICYCLE', 'RUNNING'],
#    'APP_PAC': None,
    'APP_CAT': ['SOCIAL','HEALTH','ENTER','WORK',"INFO"],
   'BAT_STA': ['CHARGING', 'DISCHARGING', 'FULL', 'NOT_CHARGING'],
#    'CAE': ['CALL', 'IDLE'],
   'CON': ['DISCONNECTED', 'WIFI', 'MOBILE'],
    'LOC_CLS': None,
    'LOC_LABEL': ['eating','home','work','social','others'] ,
    'SCR_EVENT':['ON', 'OFF', 'UNLOCK'],
    'RNG': ['VIBRATE', 'SILENT', 'NORMAL'],
    'CHG': ['DISCONNECTED', 'CONNECTED'],
    'PWS': ['ACTIVATE', 'DEACTIVATE'],
    'ONF': ['ON', 'OFF']
}
PARTICIPANTS = pd.read_csv(os.path.join(PATH_INTERMEDIATE, 'PARTICIPANT_INFO.csv'),index_col = 'pcode')
PINFO = PARTICIPANTS.assign(
    AGE=lambda x: x['age'],
    GEN=lambda x: x['gender'],
    BFI_OPN=lambda x: x['openness'],
    BFI_CON=lambda x: x['conscientiousness'],
    BFI_NEU=lambda x: x['neuroticism'],
    BFI_EXT=lambda x: x['extraversion'],
    BFI_AGR=lambda x: x['agreeableness'],
    PSS=lambda x: x['PSS10'],
    PHQ=lambda x: x['PHQ9'],
    GHQ=lambda x: x['GHQ12'],
)[[
    'AGE', 'GEN', 'BFI_OPN', 'BFI_CON', 'BFI_NEU', 'BFI_EXT', 'BFI_AGR', 'PSS10', 'PHQ9', 'GHQ12'
]]

PINFO = pd.get_dummies(PINFO, prefix_sep='=', dtype=bool).to_dict('index')
PINFO = {k: {f'PIF#{x}': y for x, y in v.items()} for k, v in PINFO.items()}
DATA = load(os.path.join(PATH_INTERMEDIATE, 'proc.pkl'))
LABELS_PROC = pd.read_csv(os.path.join(PATH_INTERMEDIATE, 'LABELS_PROC.csv'), index_col=['pcode','timestamp'],parse_dates=True)

In [13]:
LABELS_PROC.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,responseTime,scheduledTime,valence,arousal,attention,stress,duration,disturbance,change,valence_dyn,arousal_dyn,stress_dyn,disturbance_dyn,valence_fixed,arousal_fixed,stress_fixed,disturbance_fixed
pcode,timestamp,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
P01,2019-05-08 10:29:46+09:00,1557278986000,1557279000000.0,-3,3,3,3,5.0,-1,-3,0,1,1,0,0,1,1,0
P01,2019-05-08 11:16:12+09:00,1557281772000,1557282000000.0,-3,-2,2,2,15.0,3,-2,0,0,1,1,0,0,1,1
P01,2019-05-08 15:58:22+09:00,1557298702000,1557299000000.0,3,3,3,-3,20.0,2,0,1,1,0,1,1,1,0,1
P01,2019-05-08 16:41:51+09:00,1557301311000,1557301000000.0,3,3,3,-3,30.0,1,2,1,1,0,1,1,1,0,1
P01,2019-05-08 17:27:42+09:00,1557304062000,1557304000000.0,3,3,3,-3,20.0,2,2,1,1,0,1,1,1,0,1


In [14]:
PARTICIPANTS.head()

Unnamed: 0_level_0,participationStartDate,participationStartTimestamp,age,gender,openness,conscientiousness,neuroticism,extraversion,agreeableness,PSS10,PHQ9,GHQ12
pcode,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
P01,2019-05-08 00:00:00+09:00,1557241200000,27,M,11,11,3,4,13,13,0,1
P02,2019-05-08 00:00:00+09:00,1557241200000,21,M,14,5,12,14,5,27,6,18
P03,2019-05-08 00:00:00+09:00,1557241200000,24,F,10,15,8,7,11,18,2,6
P04,2019-05-08 00:00:00+09:00,1557241200000,23,M,12,11,8,6,11,20,1,9
P05,2019-05-08 00:00:00+09:00,1557241200000,27,F,10,11,13,10,6,25,14,9


In [15]:
import warnings
from pandas.errors import PerformanceWarning

warnings.simplefilter(action='ignore', category=PerformanceWarning)
warnings.simplefilter(action="ignore", category=RuntimeWarning)


with on_ray(num_cpus=16):
    for l in ['stress']:
        labels = LABELS_PROC[f'{l}_dyn']
        pids = labels.index.get_level_values('pcode').unique()
        feat = extract(
            pids=pids,
            data=DATA,
            label=labels,
            label_values=LABEL_VALUES,
            categories=CATEGORIES,
            constat_features=PINFO,
            resample_s=RESAMPLE_S,
            with_ray=True
        )
        dump(feat, os.path.join(PATH_INTERMEDIATE, f'{l}.pkl'))

2023-07-06 10:12:25,245	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


[2m[36m(_extract pid=343840)[0m [23-07-06 10:12:36] Begin feature extraction on P01's data.
[2m[36m(_extract pid=343832)[0m [23-07-06 10:12:44] Begin feature extraction on P03's data.[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(_extract pid=343839)[0m [23-07-06 10:12:56] Begin feature extraction on P05's data.
[2m[36m(_extract pid=343831)[0m [23-07-06 10:13:16] Begin feature extraction on P06's data.
[2m[36m(_extract pid=343844)[0m [23-07-06 10:13:44] Begin feature extraction on P08's data.
[2m[36m(_extract pid=343836)[0m [23-07-06 10:14:20] Begin feature extraction on P09's data.
[2m[36m(_extract pid=343838)[0m [23-07-06 10:15:05] Begin feature extraction on P10's data.
[2m[36m(_extract pid=343843)[0m [23-07-06 10:16:04] Begin feature extraction on P12's data.
[2m[3

[2m[36m(raylet)[0m Spilled 2624 MiB, 13 objects, write throughput 21 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.




[2m[36m(raylet)[0m Spilled 4251 MiB, 22 objects, write throughput 24 MiB/s.


[2m[36m(_extract pid=343840)[0m [23-07-06 12:56:51] Complete feature extraction on P01's data (9855.96 s).
[2m[36m(_extract pid=343840)[0m [23-07-06 12:57:06] Begin feature extraction on P30's data.
[2m[36m(_extract pid=343830)[0m [23-07-06 13:01:30] Complete feature extraction on P15's data (9735.29 s).
[2m[36m(_extract pid=343832)[0m [23-07-06 13:01:39] Complete feature extraction on P03's data (10134.16 s).
[2m[36m(_extract pid=343830)[0m [23-07-06 13:01:54] Begin feature extraction on P31's data.
[2m[36m(_extract pid=343832)[0m [23-07-06 13:02:27] Begin feature extraction on P32's data.
[2m[36m(_extract pid=343833)[0m [23-07-06 13:20:14] Complete feature extraction on P02's data (11255.87 s).
[2m[36m(_extract pid=343833)[0m [23-07-06 13:20:35] Begin feature extraction on P33's data.
[2m[36m(_extract pid=343838)[0m [23-07-06 13:21:23] Complete feature extraction on P10's data (11178.39 s).
[2m[36m(_extract pid=343838)[0m [23-07-06 13:22:10] Begin featur

[2m[36m(raylet)[0m Spilled 8295 MiB, 55 objects, write throughput 25 MiB/s.


[2m[36m(_extract pid=343836)[0m [23-07-06 14:25:15] Begin feature extraction on P50's data.
[2m[36m(_extract pid=343844)[0m [23-07-06 15:12:35] Complete feature extraction on P08's data (17931.19 s).
[2m[36m(_extract pid=343844)[0m [23-07-06 15:13:03] Begin feature extraction on P51's data.
[2m[36m(_extract pid=343833)[0m [23-07-06 16:05:58] Complete feature extraction on P33's data (9923.21 s).
[2m[36m(_extract pid=343833)[0m [23-07-06 16:06:15] Begin feature extraction on P52's data.
[2m[36m(_extract pid=343835)[0m [23-07-06 16:08:11] Complete feature extraction on P26's data (20519.01 s).
[2m[36m(_extract pid=343835)[0m [23-07-06 16:08:31] Begin feature extraction on P53's data.
[2m[36m(_extract pid=343841)[0m [23-07-06 16:22:56] Complete feature extraction on P28's data (21279.30 s).
[2m[36m(_extract pid=343841)[0m [23-07-06 16:23:19] Begin feature extraction on P55's data.
[2m[36m(_extract pid=343832)[0m [23-07-06 16:27:36] Complete feature extraction