# Segment the time series into 1 minute sequences for each user

In [1]:
from Funcs.Utility import *
import numpy as np
import pandas as pd
from typing import Dict, Callable, Union, Tuple, List, Optional, Iterable
from datetime import timedelta as td
from scipy import stats
import ray
import warnings
import time

In [2]:
def _safe_na_check(_v):
    _is_nan_inf = False
    
    try:
        _is_nan_inf = np.isnan(_v) or np.isinf(_v)
    except:
        _is_nan_inf = False
    
    return _is_nan_inf or _v is None

In [3]:
import os
import cloudpickle

DATA = load(os.path.join(PATH_INTERMEDIATE, 'proc.pkl'))
LABELS_PROC = pd.read_csv(os.path.join(PATH_INTERMEDIATE, 'proc', 'LABELS_PROC.csv'), index_col=['pcode','timestamp'],parse_dates=True)

In [5]:
# import pandas as pd
# import ray
# import logging
# from datetime import datetime

# logging.basicConfig(level=logging.DEBUG)

# # RESAMPLE_S = {
#     # 'ACC_AXX': 0.25,
#     # 'ACC_AXY': 0.25,
#     # 'ACC_AXZ': 0.25,
#     # 'ACC_MAG': 0.25,
#     # 'EDA': 0.5,
# # }

# @ray.remote
# def segment_sensor_data(pcode, sensor_type, sensor_data, label_data):
#     user_labels = label_data.loc[pcode]
#     if isinstance(user_labels, pd.Series):
#         user_labels = user_labels.to_frame().T

#     # Initialize empty DataFrames for labeled and unlabeled sequences
#     labeled_sequences_df = pd.DataFrame()
#     unlabeled_sequences_df = pd.DataFrame()

#     resampled_data = sensor_data.resample('T').asfreq()
#     if isinstance(resampled_data, pd.Series):
#         resampled_data = resampled_data.to_frame()

#     for time, row in resampled_data.iterrows():
#         sequence = {sensor_type: sensor_data.loc[time:time + pd.Timedelta(minutes=5)], 'pcode': pcode, 'timestamp': time  }

#         future_labels = user_labels[user_labels.index > time]
#         if not future_labels.empty:
#             time_differences = (future_labels.index - time).total_seconds()
#             abs_time_differences = abs(pd.Series(time_differences, index=future_labels.index))
#             nearest_future_time = abs_time_differences.idxmin()
#             label_row = future_labels.loc[nearest_future_time]

#             # Ensure duration is a valid number
#             duration = label_row['duration']
#             if pd.isna(duration) or not isinstance(duration, (int, float)):
#                 duration = 5  # Default value or use another appropriate handling

#             overlapping_labels = user_labels[(user_labels.index >= time) & (user_labels.index - pd.Timedelta(minutes=duration) < time)]

#             if not overlapping_labels.empty:
#                 label = overlapping_labels.iloc[-1]['stress_fixed']
#                 sequence['label'] = label
#                 labeled_sequence_df = pd.concat([pd.DataFrame(sequence)], ignore_index=True)
#                 labeled_sequences_df = pd.concat([labeled_sequences_df, labeled_sequence_df], ignore_index=True)
#             else:
#                 sequence['label'] = None
#                 unlabeled_sequence_df = pd.concat([pd.DataFrame(sequence)], ignore_index=True)
#                 unlabeled_sequences_df = pd.concat([unlabeled_sequences_df, unlabeled_sequence_df], ignore_index=True)
#         else:
#             sequence['label'] = None
#             unlabeled_sequence_df = pd.concat([pd.DataFrame(sequence)], ignore_index=True)
#             unlabeled_sequences_df = pd.concat([unlabeled_sequences_df, unlabeled_sequence_df], ignore_index=True)

#     return labeled_sequences_df, unlabeled_sequences_df

# with on_ray():
#     segmented_data = []
#     for pcode in LABELS_PROC.index.get_level_values('pcode').unique():
#         print(f"{datetime.now()} - Segmenting {pcode} data...")
#         for sensor_type, data in DATA.items():
#             if pcode in data.index.get_level_values('pcode'):
#                 # resample_interval = RESAMPLE_S.get(sensor_type, 1)
#                 resample_interval = 10
#                 user_data = data.loc[pcode]
#                 resampled_sensor_data = user_data.resample(f'{resample_interval}S').interpolate(method='linear').dropna()
#                 segmented_data.append(segment_sensor_data.remote(pcode, sensor_type, resampled_sensor_data, LABELS_PROC))
#         print(f"{datetime.now()} - Finished segmenting {pcode} data.")

#     results = ray.get(segmented_data)

#     print(f"{datetime.now()} - Finished segmenting data.")

#     # Aggregate DataFrames
#     labeled_df = pd.concat([item[0] for item in results], ignore_index=True)
#     unlabeled_df = pd.concat([item[1] for item in results], ignore_index=True)

#     print(f"{datetime.now()} - Finished aggregating data.")

#     labeled_df.to_csv(os.path.join(PATH_INTERMEDIATE, 'proc', 'labeled_sequences.csv'), index=False)
#     unlabeled_df.to_csv(os.path.join(PATH_INTERMEDIATE, 'proc', 'unlabeled_sequences.csv'), index=False)

#     print(f"{datetime.now()} - Finished saving data.")

In [11]:
import pandas as pd
import ray
import logging
from datetime import datetime
import os

logging.basicConfig(level=logging.DEBUG)

sequence_length = 1 # minutes
resample_interval = 1 # seconds

@ray.remote
def segment_sensor_data(pcode, sensor_type, sensor_data, label_data):

    print(f"{datetime.now()} - Started segmenting {pcode} {sensor_type} data.")

    user_labels = label_data.loc[pcode]
    if isinstance(user_labels, pd.Series):
        user_labels = user_labels.to_frame().T

    labeled_sequences_df = pd.DataFrame()
    unlabeled_sequences_df = pd.DataFrame()

    resampled_data = sensor_data.resample('T').asfreq()
    if isinstance(resampled_data, pd.Series):
        resampled_data = resampled_data.to_frame()

    for time, row in resampled_data.iterrows():
        sequence = {sensor_type: sensor_data.loc[time:time + pd.Timedelta(minutes=sequence_length)], 'pcode': pcode, 'timestamp': time}

        future_labels = user_labels[user_labels.index > time]
        if not future_labels.empty:

            time_differences = (future_labels.index - time).total_seconds()
            abs_time_differences = abs(pd.Series(time_differences, index=future_labels.index))
            nearest_future_time = abs_time_differences.idxmin()
            label_row = future_labels.loc[nearest_future_time]

            # Ensure duration is a valid number
            duration = label_row['duration']
            if pd.isna(duration) or not isinstance(duration, (int, float)):
                duration = sequence_length  # Default value or use another appropriate handling

            overlapping_labels = user_labels[(user_labels.index >= time) & (user_labels.index - pd.Timedelta(minutes=duration) < time)]

            if not overlapping_labels.empty:
                label = overlapping_labels.iloc[-1]['stress_fixed']
                sequence['label'] = label
                labeled_sequence_df = pd.DataFrame(sequence)
                labeled_sequences_df = pd.concat([labeled_sequences_df, labeled_sequence_df], ignore_index=True)
            else:
                sequence['label'] = None
                unlabeled_sequence_df = pd.DataFrame(sequence)
                unlabeled_sequences_df = pd.concat([unlabeled_sequences_df, unlabeled_sequence_df], ignore_index=True)
        else:
            sequence['label'] = None
            unlabeled_sequence_df = pd.DataFrame(sequence)
            unlabeled_sequences_df = pd.concat([unlabeled_sequences_df, unlabeled_sequence_df], ignore_index=True)

    print(f"{datetime.now()} - Finished segmenting {pcode} {sensor_type} data.")

    # Save each user's sequences as separate CSV files
    if not labeled_sequences_df.empty:
        labeled_sequences_df.to_csv(os.path.join(PATH_INTERMEDIATE, 'proc', f"{pcode}_{sensor_type}_labeled.csv"), index=False)
        print(f"{datetime.now()} - Finished saving {pcode} {sensor_type} labeled data.")
    if not unlabeled_sequences_df.empty:
        unlabeled_sequences_df.to_csv(os.path.join(PATH_INTERMEDIATE, 'proc', f"{pcode}_{sensor_type}_unlabeled.csv"), index=False)
        print(f"{datetime.now()} - Finished saving {pcode} {sensor_type} unlabeled data.")

    return pcode, sensor_type  # Just to track progress

with on_ray():

    segmented_data = []
    for pcode in LABELS_PROC.index.get_level_values('pcode').unique():
        print(f"{datetime.now()} - Segmenting {pcode} data...")
        for sensor_type, data in DATA.items():
            if pcode in data.index.get_level_values('pcode'):
                resample_interval = resample_interval  # Assuming 1 second as the interval
                user_data = data.loc[pcode]
                resampled_sensor_data = user_data.resample(f'{resample_interval}S').interpolate(method='linear').dropna()
                segmented_data.append(segment_sensor_data.remote(pcode, sensor_type, resampled_sensor_data, LABELS_PROC))
        # print(f"{datetime.now()} - Finished segmenting {pcode} data.")

    results = ray.get(segmented_data)
    print(f"{datetime.now()} - Finished segmenting and saving data for all users.")

DEBUG:filelock:Attempting to acquire lock 140523755614800 on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Lock 140523755614800 acquired on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Attempting to release lock 140523755614800 on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Lock 140523755614800 released on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Attempting to acquire lock 140523755614800 on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Lock 140523755614800 acquired on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Attempting to release lock 140523755614800 on /tmp/ray/session_2023-12-06_11-40-45_093514_228271/ports_by_node.json.lock
DEBUG:filelock:Lock 140523755614800 released on /tmp/ray/session_2023-12-06_11-40-45_09

2023-12-06 11:40:46,980	INFO worker.py:1612 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


2023-12-06 11:40:47.530756 - Segmenting P01 data...
[2m[36m(segment_sensor_data pid=238271)[0m 2023-12-06 11:40:49.752864 - Started segmenting P01 ACC_AXX data.
[2m[36m(segment_sensor_data pid=238285)[0m 2023-12-06 11:40:51.859259 - Started segmenting P01 ACC_AXY data.
[2m[36m(segment_sensor_data pid=238274)[0m 2023-12-06 11:40:53.978138 - Started segmenting P01 ACC_AXZ data.
[2m[36m(segment_sensor_data pid=238283)[0m 2023-12-06 11:40:56.110148 - Started segmenting P01 ACC_MAG data.
[2m[36m(segment_sensor_data pid=238276)[0m 2023-12-06 11:40:56.673172 - Started segmenting P01 AML data.
[2m[36m(segment_sensor_data pid=238257)[0m 2023-12-06 11:40:56.981934 - Started segmenting P01 CAL data.
[2m[36m(segment_sensor_data pid=238265)[0m 2023-12-06 11:40:57.304646 - Started segmenting P01 DST_DST data.
[2m[36m(segment_sensor_data pid=238266)[0m 2023-12-06 11:40:57.639303 - Started segmenting P01 DST_PAC data.
[2m[36m(segment_sensor_data pid=238284)[0m 2023-12-06 11: