In [1]:
from Funcs.Utility import *
import numpy as np
import pandas as pd
from typing import Dict, Callable, Union, Tuple, List, Optional, Iterable
from datetime import timedelta as td
from scipy import stats
import ray
import warnings
import time

In [2]:

def _safe_na_check(_v):
    _is_nan_inf = False
    
    try:
        _is_nan_inf = np.isnan(_v) or np.isinf(_v)
    except:
        _is_nan_inf = False
    
    return _is_nan_inf or _v is None

In [3]:
def _extract_numeric_feature(d_key, d_val) -> Dict:
    feature = {}
    v=d_val
    hist, _ = np.histogram(v, bins='doane', density=False)
    std = np.sqrt(np.var(v, ddof=1)) if len(v) > 1 else 0
    v_norm = (v - np.mean(v)) / std if std != 0 else np.zeros(len(v))
    feature[f'{d_key}#AVG'] = np.mean(v) # Sample mean
    feature[f'{d_key}#STD'] = std # Sample standard deviation
    if std !=0:
        feature[f'{d_key}#SKW'] = stats.skew(v, bias=False) # Sample skewness
        feature[f'{d_key}#KUR'] = stats.kurtosis(v, bias=False) # Sample kurtosis
    else:
        feature[f'{d_key}#SKW'] = 0 # Sample skewness
        feature[f'{d_key}#KUR'] = 0 # Sample c
    feature[f'{d_key}#ASC'] = np.sum(np.abs(np.diff(v))) # Abstract sum of changes
    feature[f'{d_key}#BEP'] = stats.entropy(hist) # Binned entropy
    feature[f'{d_key}#MED'] = np.median(v) # Median
    feature[f'{d_key}#TSC'] = np.sqrt(np.sum(np.power(np.diff(v_norm), 2))) # Timeseries complexity
    return feature

In [4]:
def _extract_categorical_feature(cats, d_key, d_val) -> Dict:
    feature = {}
    v = d_val
    cnt = v.value_counts()
    val, sup = cnt.index, cnt.values
    hist = {k: v for k, v in zip(val, sup)}

    # Information Entropy
    feature[f'{d_key}#ETP#'] = stats.entropy(sup / len(v))
    # Abs. Sum of Changes
    feature[f'{d_key}#ASC#'] = np.sum(v.values[1:] != v.values[:-1])
    if len(cats) == 2: # Dichotomous categorical data
        c = cats[0]
        feature[f'{d_key}#RLV_SUP'] = hist[c] / len(v) if c in hist else 0
    else:
        for c in cats:
            feature[f'{d_key}#RLV_SUP={c}'] = hist[c] / len(v)  if c in hist else 0
            
    return feature

In [5]:
def _extract_timeWindow_feature(is_numeric, cats, d_key, d_val) -> Dict:
    feature = {}
    v = d_val

    if d_key in ['LOC_CLS']:
        feature = _extract_categorical_feature(cats, d_key, v)
        feature['LOC#NumOfPlcVist'] = len(set(v))
    else:
        if is_numeric:
            feature = _extract_numeric_feature(d_key, v)
        else:
            feature =_extract_categorical_feature(cats, d_key, v)
    return feature

In [6]:
epoch_names = {
    0: 'Dawn',
    1: 'Morning',
    2: 'Afternoon',
    3: 'LateAfternoon',
    4: 'Evening',
    5: 'Night'
}
def _extract(
        pid: str,
        data: Dict[str, pd.Series],
        label: pd.Series,
        label_values: List[str],
#        window_data: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
#        window_label: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
        categories: Dict[str, Optional[List[any]]] = None,
        constant_features: Dict[str, any] = None,
        resample_s: Dict[str, float] = None
) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    _s = time.time()
    log(f"Begin feature extraction on {pid}'s data.")
    categories = categories or dict()
    constant_features = constant_features or dict()
    resample_s = resample_s or dict()
    X, y, date_times = [], [], []
#    count = 0
    for timestamp in label.index:
        row = dict()
        #Find the start of today and yesterday for extracting today epoch features and yesterday epoch features
        start_of_today = datetime.datetime(timestamp.year, timestamp.month, timestamp.day, tzinfo=timestamp.tzinfo)
        start_of_today = pd.Timestamp(start_of_today.date(), tz=DEFAULT_TZ)
        start_of_yesterday = timestamp - pd.Timedelta(days=1)
        start_of_yesterday = pd.Timestamp(start_of_yesterday.date(), tz=DEFAULT_TZ)
        label_cur = label.at[timestamp]
        t = timestamp - td(milliseconds=1)

        #Yesterday and Today 3-hour epochs
        yesterday_time_windows_epoch = []
        for i in range(6):
            start = start_of_yesterday + pd.Timedelta(hours=i*3 + 6 )
            end = start_of_yesterday + pd.Timedelta(hours=(i+1)*3 +6)
            if start <= t:
                yesterday_time_windows_epoch.append((start, min(end, t)))
            else:
                break
        today_time_windows_epoch = []
        for i in range(6):
            start = start_of_today + pd.Timedelta(hours=i*3 +6)
            end = start_of_today + pd.Timedelta(hours=(i+1)*3 + 6)
            if start <= t:
                today_time_windows_epoch.append((start, min(end, t)))
            else:
                break
        #Yesterday and Today hourly windows
#         yesterday_time_windows_hour = []
#         for i in range(24):
#             start = start_of_yesterday + pd.Timedelta(hours=i*1 )
#             end = start_of_yesterday + pd.Timedelta(hours=(i+1)*1)
#             if start <= t:
#                 yesterday_time_windows_hour.append((start, min(end, t)))
#             else:
#                 break
#         today_time_windows_hour = []
#         for i in range(24):
#             start = start_of_today + pd.Timedelta(hours=i*1 )
#             end = start_of_today + pd.Timedelta(hours=(i+1)*1)
#             if start <= t:
#                 today_time_windows_hour.append((start, min(end, t)))
#             else:
#                 break
        
        # Features relevant to participants' info
        for d_key, d_val in constant_features.items():
            row[d_key] = d_val
        # Features from sensor data
        for d_key, d_val in data.items():
            is_numeric = d_key not in categories
            cats = categories.get(d_key) or list()
            d_val = d_val.sort_index()
            # Features relevant to latest value of a given data
            # These features are extracted only for bounded categorical data and numerical data.
            if is_numeric or cats:
                try:
                    v = d_val.loc[:t].iloc[-1]
                except (KeyError, IndexError):
                    v = 0
                if is_numeric:
                    row[f'{d_key}#VAL'] = v
                else:
                    for c in cats:
                        row[f'{d_key}#VAL={c}'] = v == c
            # Features relevant to duration since the latest state change.
            # These features are only for categorical data.
            # In addition, duration since a given state is set recently is considered,
            # that are available only at bounded categorical data.
            if not is_numeric:
                try:
                    v = d_val.loc[:t]
                    row[f'{d_key}#DSC'] = (t - v.index[-1]).total_seconds() if len(v) else -1.0
                    for c in cats:
                        v_sub = v.loc[lambda x: x == c].index
                        row[f'{d_key}#DSC={c}'] = (t - v_sub[-1]).total_seconds() if len(v_sub) else -1.0
                except (KeyError, IndexError):
                    row[f'{d_key}#DSC'] = 0
                    for c in cats:
                        row[f'{d_key}#DSC={c}'] = 0
            # Features extracted from time-windows
            # These features requires resampling and imputation on each data.
            sample_rate = resample_s.get(d_key) or 1
            d_val.drop_duplicates(inplace=True)
            d_val_res = d_val.resample(f'{sample_rate}S', origin='start')
            if is_numeric:
                try:
                    # Your resampling code here...
                    d_val_res = d_val_res.mean().interpolate(method='linear').dropna()
                except ValueError:
                    # Save input data to a file or external storage for debugging...
                    print(d_val_res)
                    print(d_val)
                    raise
            else:
                d_val_res = d_val_res.ffill().dropna()
            #No resampling
#             d_val_res =d_val
 
           ###############################################################    
           # Features extracted from 5-min immediate past time-windows
            w_val = 5 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}#ImmediatePast_5': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)
            # Features extracted from 10-min immediate past time-windows
            w_val = 10 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}#ImmediatePast_10': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)

           # Features extracted from 15-min immediate past time-windows
            w_val = 15 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}#ImmediatePast_15': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)

            # Features extracted from 30-min immediate past time-windows
            w_val = 30 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}#ImmediatePast_30': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)

            # Features extracted from 45-min immediate past time-windows
            w_val = 45 * 60
            try:
                v = d_val_res.loc[t - td(seconds=w_val):t]
            except (KeyError, IndexError):
                continue
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                new_row = {f'{k}#ImmediatePast_45': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                row.update(new_row)
                
            #############################################################    
            #Features extracted from yesterday epoch time windows
            for count, (start, end) in enumerate(yesterday_time_windows_epoch):
                # Get data for the current yesterday epoch time window
                try:
                    v = d_val_res.loc[start:end]
                except (KeyError, IndexError):
                    continue
                epoch_name = epoch_names.get(count)

                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    new_row = {f'{k}#Yesterday{epoch_name}': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                    row.update(new_row)
                    
            #Features extracted from today epoch time windows until current time
            for count, (start, end) in enumerate(today_time_windows_epoch):
                # Get data for the current time window
                try:
                    v = d_val_res.loc[start:end]
                except (KeyError, IndexError):
                    continue
                epoch_name = epoch_names.get(count)

                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    new_row = {f'{k}_Today{epoch_name}': v for k, v in _extract_timeWindow_feature(is_numeric, cats, d_key, v).items()}
                    row.update(new_row)

            
        # Features relevant to time
        day_of_week = ['MON', 'TUE', 'WED', 'THU', 'FRI', 'SAT', 'SUN'][t.isoweekday() - 1]
        is_weekend = 'Y' if t.isoweekday() > 5 else 'N'
        hour = t.hour

        if 6 <= hour < 9:
            hour_name = 'Dawn'
        elif 9 <= hour < 12:
            hour_name = 'MORNING'
        elif 12 <= hour < 15:
            hour_name = 'AFTERNOON'
        elif 15 <= hour < 18:
            hour_name = 'LATE_AFTERNOON'
        elif 18 <= hour < 21:
            hour_name = 'EVENING'
        elif 21 <= hour < 24:
            hour_name = 'NIGHT'
        else:
            hour_name = 'MIDNIGHT'
            
        for d in ['MON', 'TUE', 'WED', 'THU', 'FRI', 'SAT', 'SUN']:
            row[f'Time#DOW={d}'] = d == day_of_week
        for d in ['Y', 'N']:
            row[f'Time#WKD={d}'] = d == is_weekend
        for d in ['DAWN', 'MORNING', 'AFTERNOON', 'LATE_AFTERNOON', 'EVENING', 'NIGHT', 'MIDNIGHT']:
            row[f'Time#HRN={d}'] = d == hour_name
        

        try:
            last_label = label.loc[label[:t].index.max()]
        except (KeyError, IndexError):
            last_label = 0
        row[f'ESM#LastLabel'] = last_label

#############################################################################################
        #The following code is designed for fixed threshold
        # Label values extracted from yesterday epochs
        for count, (start, end) in enumerate(yesterday_time_windows_epoch):
            try:
                v = label.loc[start:end]
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2: # Binary classification
                    row[f'ESM#LIK#Yesterday{epoch_name}'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK#Yesterday{epoch_name}'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
            except (KeyError, IndexError):
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2:
                    row[f'ESM#LIK#Yesterday{epoch_name}'] = 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK#Yesterday{epoch_name}'] = 0
        # Label values extracted from today epochs
        for count, (start, end) in enumerate(today_time_windows_epoch):
            try:
                v = label.loc[start:end]
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2: # Binary classification
                    row[f'ESM#LIK#Today{epoch_name}'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK#Today{epoch_name}'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
            except (KeyError, IndexError):
                epoch_name = epoch_names.get(count)
                if len(label_values) <= 2:
                    row[f'ESM#LIK#Today{epoch_name}'] = 0
                else:
                    for l in label_values:
                        row[f'ESM#LIK#Today{epoch_name}'] = 0
###############################################################################3
          #The following code is designed for stress_dyn since we should not know previous binned label values

        # Label values extracted from yesterday epochs
#         for count, (start, end) in enumerate(yesterday_time_windows_epoch):
#             try:
#                 v = label.loc[start:end]
#                 epoch_name = epoch_names.get(count)
#                 row[f'ESM#Mean#Yesterday{epoch_name}'] = np.mean(v) if len(v) > 0 else 0
#                 row[f'ESM#Std#Yesterday{epoch_name}'] = np.std(v) if len(v) > 0 else 0

#             except (KeyError, IndexError):
#                 epoch_name = epoch_names.get(count)
#                 row[f'ESM#Mean#Yesterday{epoch_name}'] = 0 
#                 row[f'ESM#Std#Yesterday{epoch_name}'] = 0
                
# #         # Label values extracted from today epochs
#         for count, (start, end) in enumerate(today_time_windows_epoch):
#             try:
#                 v = label.loc[start:end]
#                 epoch_name = epoch_names.get(count)
#                 row[f'ESM#Mean#Today{epoch_name}'] = np.mean(v) if len(v) > 0 else 0
#                 row[f'ESM#Std#Today{epoch_name}'] = np.std(v) if len(v) > 0 else 0

#             except (KeyError, IndexError):
#                 epoch_name = epoch_names.get(count)
#                 row[f'ESM#Mean#Today{epoch_name}'] = 0  
#                 row[f'ESM#Std#Today{epoch_name}'] = 0 
############################################################################

#         # Label values extracted from immediate past
#         w_val = 15 * 60
#         try:
#             v = label.loc[t - td(seconds=w_val):t]
#             epoch_name = epoch_names.get(count)
#             if len(label_values) <= 2: # Binary classification
#                 row[f'ESM#LIK#ImmediatePast'] = np.sum(v == label_values[0]) / len(v) if len(v) > 0 else 0
#             else:
#                 for l in label_values:
#                     row[f'ESM#LIK={l}#ImmediatePast'] = np.sum(v == l) / len(v) if len(v) > 0 else 0
#         except (KeyError, IndexError):
#             epoch_name = epoch_names.get(count)
#             if len(label_values) <= 2:
#                 row[f'ESM#LIK#ImmediatePast'] = 0
#             else:
#                 for l in label_values:
#                     row[f'ESM#LIK={l}#ImmediatePast'] = 0

        row = {
            k: 0 if _safe_na_check(v) else v
            for k, v in row.items()
        }

        X.append(row)
        y.append(label_cur)
        date_times.append(timestamp)
    
    log(f"Complete feature extraction on {pid}'s data ({time.time() - _s:.2f} s).")
    
    #Without normalization for each user
    X = pd.DataFrame(X)
    y = np.asarray(y)
    group = np.repeat(pid, len(y))
    date_times =  np.asarray(date_times)

    #Normalization for each feature of user pid
    df_X = pd.DataFrame(X).fillna(0)
    df_X_sensor =  df_X.loc[:,[('PIF' not in str(x)) and ('ESM' not in str(x)) for x in df_X.keys()]]  
    df_X_no_sensor = df_X.loc[:,[('PIF' in str(x)) or ('ESM' in str(x)) for x in df_X.keys()]]
    # Z-score Standardization for each column
    X_normalized_sensor = df_X_sensor.apply(lambda col: (col - col.mean()) / col.std())
    X_normalized = pd.concat([X_normalized_sensor, df_X_no_sensor], axis=1 )
    X, y, group, date_times = X_normalized, np.asarray(y), np.repeat(pid, len(y)), np.asarray(date_times)
    return X, y, group, date_times

def extract(
        pids: Iterable[str],
        data: Dict[str, pd.Series],
        label: pd.Series,
        label_values: List[str],
#        window_data: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
#        window_label: Dict[str, Union[int, Callable[[pd.Timestamp], int]]],
        categories: Dict[str, Optional[List[any]]] = None,
        constat_features: Dict[str, Dict[str, any]] = None,
        resample_s: Dict[str, float] = None,
        with_ray: bool=False
):
    if with_ray and not ray.is_initialized():
        raise EnvironmentError('Ray should be initialized if "with_ray" is set as True.')
    func = ray.remote(_extract).remote if with_ray else _extract
    jobs = []
    for pid in pids:
        d = dict()
        for k, v in data.items():
            try:
                d[k] = v.loc[(pid, )]
                if k.startswith('LOC_'):
                    d[k].index= pd.to_datetime( d[k].index, unit='ms', utc=True).tz_convert(DEFAULT_TZ)
                d['SPEED'] = d.pop('LOC_SPEED')
            except (KeyError, IndexError):
                pass
        job = func(
            pid=pid, data=d, label=label.loc[(pid, )],
            label_values=label_values,
#            window_data=window_data,
#            window_label=window_label,
            categories=categories,
            constant_features=constat_features[pid],
            resample_s=resample_s
        )
        jobs.append(job)
    jobs = ray.get(jobs) if with_ray else jobs
    print([x.shape for _, x, _, _ in jobs])
    X = pd.concat([x for x, _, _, _ in jobs], axis=0, ignore_index=True)
    y = np.concatenate([x for _, x, _, _ in jobs], axis=0)
    group = np.concatenate([x for _, _, x, _ in jobs], axis=0)
    date_times = np.concatenate([x for _, _, _, x in jobs], axis=0)
    t_s = date_times.min().normalize().timestamp()
    t_norm = np.asarray(list(map(lambda x: x.timestamp() - t_s, date_times)))
    C, DTYPE = X.columns, X.dtypes
    X = X.fillna({
        **{c: False for c in C[(DTYPE == object) | (DTYPE == bool)]},
        **{c: 0.0 for c in C[(DTYPE != object) & (DTYPE != bool)]},
    }).astype({
        **{c: 'bool' for c in C[(DTYPE == object) | (DTYPE == bool)]},
        **{c: 'float32' for c in C[(DTYPE != object) & (DTYPE != bool)]},
    })
    return X, y, group, t_norm, date_times

In [7]:
import os
import cloudpickle

LABEL_VALUES = [1, 0]
RESAMPLE_S = {
    'ACC_AXX': 0.25,
    'ACC_AXY': 0.25,
    'ACC_AXZ': 0.25,
    'ACC_MAG': 0.25,
#     'AML': 1.0,
    'EDA': 0.5,
}


CATEGORIES = {
#    'DST_MOT': ['IDLE', 'WALKING', 'JOGGING', 'RUNNING'],
#    'ULV_INT': ['NONE', 'LOW', 'MEDIUM', 'HIGH'],
    'ACT': ['WALKING', 'STILL', 'IN_VEHICLE', 'ON_BICYCLE', 'RUNNING'],
#    'APP_PAC': None,
    'APP_CAT': ['SOCIAL','HEALTH','ENTER','WORK',"INFO"],
#    'BAT_STA': ['CHARGING', 'DISCHARGING', 'FULL', 'NOT_CHARGING'],
#    'CAE': ['CALL', 'IDLE'],
#    'CON': ['DISCONNECTED', 'WIFI', 'MOBILE'],
    'LOC_CLS': None,
    'LOC_LABEL': ['eating','home','work','social','others'] ,
    # 'SCR_EVENT':['ON', 'OFF', 'UNLOCK'],
    # 'RNG': ['VIBRATE', 'SILENT', 'NORMAL'],
    # 'CHG': ['DISCONNECTED', 'CONNECTED'],
    # 'PWS': ['ACTIVATE', 'DEACTIVATE'],
    # 'ONF': ['ON', 'OFF']
}
PARTICIPANTS = pd.read_csv(os.path.join(PATH_INTERMEDIATE, 'Preprocessed', 'PARTICIPANT_INFO.csv'),index_col = 'uid')
PARTICIPANTS = PARTICIPANTS.rename(lambda x: 'P{:02}'.format(int(x)), axis='index')
PINFO = PARTICIPANTS.assign(
    AGE=lambda x: x['age'],
    GEN=lambda x: x['gender'],
    PSS=lambda x: x['PSS'],
)[[
    'AGE', 'GEN','PSS'
]]

PINFO = pd.get_dummies(PINFO, prefix_sep='=', dtype=bool).to_dict('index')
PINFO = {k: {f'PIF#{x}': y for x, y in v.items()} for k, v in PINFO.items()}
DATA = load(os.path.join(PATH_INTERMEDIATE, 'proc.pkl'))
LABELS_PROC = pd.read_csv(os.path.join(PATH_INTERMEDIATE, 'Preprocessed', 'LABELS_PROC.csv'), index_col=['uid','timestamp'],parse_dates=True)

In [8]:
with on_ray():

    feat = extract(
        pids = LABELS_PROC.index.get_level_values('uid').unique(),
        data = DATA,
        categories=CATEGORIES,
        label = LABELS_PROC['stress_fixed'],
        resample_s=RESAMPLE_S, with_ray=True,
        constat_features=PINFO,
        label_values=LABEL_VALUES,
    )
    dump(feat, os.path.join(PATH_INTERMEDIATE, 'feat', f'stress-fixed-all-time-windows-normalized.pkl'))

2024-04-22 12:23:47,614	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 143.248.57.77:6379...
2024-04-22 12:23:47,640	INFO worker.py:1612 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  pid=pid, data=d, label=label.loc[(pid, )],
  d[k] = v.loc[(pid, )]
  d[k] = v.loc[(pid, )]
  d[

[2m[36m(_extract pid=460537, ip=143.248.57.67)[0m [24-04-22 12:23:49] Begin feature extraction on P13's data.
[2m[36m(_extract pid=460530, ip=143.248.57.67)[0m [24-04-22 13:03:34] Complete feature extraction on P22's data (2385.31 s).
[2m[36m(_extract pid=1070415)[0m [24-04-22 12:23:51] Begin feature extraction on P19's data.[32m [repeated 23x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(_extract pid=460533, ip=143.248.57.67)[0m [24-04-22 13:04:04] Complete feature extraction on P21's data (2415.42 s).
[2m[36m(_extract pid=460536, ip=143.248.57.67)[0m [24-04-22 13:04:59] Complete feature extraction on P03's data (2469.38 s).
[2m[36m(_extract pid=460537, ip=143.248.57.67)[0m [24-04-22 13:09:12] Complete feature extraction on P13's data (2723.61 s).
[2m[36m(_extract pid=460539, ip=143.248.57.