# prepare_data
> Code to transform raw data into features

In [None]:
#| default_exp run.prepare_data

In [None]:
#| export 
import shutil
# from pathlib import Path
from fastcore.all import Path

import hydra
import numpy as np
import polars as pl
from tqdm import tqdm

from sleep_state_detection.conf import PrepareDataConfig
from sleep_state_detection.utils.common import trace

## Functions

In [None]:
#| export 
SERIES_SCHEMA = {
    "series_id": pl.Utf8,
    "step": pl.UInt32,
    "anglez": pl.Float32,
    "enmo": pl.Float32,
}


FEATURE_NAMES = [
    "anglez",
    "enmo",
    "step",
    "hour_sin",
    "hour_cos",
    "month_sin",
    "month_cos",
    "minute_sin",
    "minute_cos",
    "anglez_sin",
    "anglez_cos",
    'anglez_diff', 
    'enmo_diff', 
    'anglez_diff_rolling_median', 
    'enmo_diff_rolling_median', 
    'anglez_diff_rolling_median_reverse', 
    'enmo_diff_rolling_median_reverse', 
]

ANGLEZ_MEAN = -8.810476
ANGLEZ_STD = 35.521877
ENMO_MEAN = 0.041315
ENMO_STD = 0.101829


def to_coord(x: pl.Expr, max_: int, name: str) -> list[pl.Expr]:
    rad = 2 * np.pi * (x % max_) / max_
    x_sin = rad.sin()
    x_cos = rad.cos()

    return [x_sin.alias(f"{name}_sin"), x_cos.alias(f"{name}_cos")]


def deg_to_rad(x: pl.Expr) -> pl.Expr:
    return np.pi / 180 * x


def add_feature(series_df: pl.DataFrame) -> pl.DataFrame:
    series_df = (
        series_df.with_row_count("step")
        .with_columns(
            *to_coord(pl.col("timestamp").dt.hour(), 24, "hour"), # So the model sees hour 23 as close to 1
            *to_coord(pl.col("timestamp").dt.month(), 12, "month"), # ... Nov close to Jan
            *to_coord(pl.col("timestamp").dt.minute(), 60, "minute"), # ... Minute 59 close to 1
#             pl.col("step") / pl.count("step"),
            pl.col('anglez_rad').sin().alias('anglez_sin'),
            pl.col('anglez_rad').cos().alias('anglez_cos'),
            pl.col('anglez').diff().fill_null(0).alias('anglez_diff'), 
            pl.col('enmo').diff().fill_null(0).alias('enmo_diff'), 
            pl.col('anglez').diff().fill_null(0).rolling_median(5 * 12).alias('anglez_diff_rolling_median'), 
            pl.col('enmo').diff().fill_null(0).rolling_median(5 * 12).alias('enmo_diff_rolling_median'),
            pl.col('anglez').diff().fill_null(0).reverse().rolling_median(5 * 12).alias('anglez_diff_rolling_median_reverse'), 
            pl.col('enmo').diff().fill_null(0).reverse().rolling_median(5 * 12).alias('enmo_diff_rolling_median_reverse'),
        )
        .select("series_id", *FEATURE_NAMES)
    )
    fill_cols = [
        'anglez_diff_rolling_median', 
        'enmo_diff_rolling_median', 
        'anglez_diff_rolling_median_reverse', 
        'enmo_diff_rolling_median_reverse',
    ]
    for col in fill_cols:
        mean_value = series_df.select(pl.col(col).mean()).to_numpy()[0, 0]
        series_df = series_df.with_columns(pl.col(col).fill_null(pl.lit(mean_value)))
    return series_df


def save_each_series(this_series_df: pl.DataFrame, columns: list[str], output_dir: Path):
    output_dir.mkdir(parents=True, exist_ok=True)

    for col_name in columns:
        x = this_series_df.get_column(col_name).to_numpy(zero_copy_only=True)
        np.save(output_dir / f"{col_name}.npy", x)

## Walkthrough of main 

### Load in config

In [None]:
with hydra.initialize(config_path="../conf", version_base="1.2"):
    cfg = hydra.compose(config_name="prepare_data", overrides=["dir=local-small"])
cfg

{'phase': 'train', 'dir': {'data_dir': '/home/work/sleep/sleep_state_detection/input_small/child-mind-institute-detect-sleep-states', 'processed_dir': '/home/work/sleep/sleep_state_detection/input_small/processed_data', 'output_dir': '/home/work/sleep/sleep_state_detection/input_small/output', 'model_dir': '/home/work/sleep/sleep_state_detection/input_small/output/train', 'sub_dir': './'}}

In [None]:
processed_dir: Path = Path(cfg.dir.processed_dir) / cfg.phase
processed_dir

Path('/home/work/sleep/sleep_state_detection/input_small/processed_data/train')

In [None]:
if processed_dir.exists():
        shutil.rmtree(processed_dir)
        print(f"Removed {cfg.phase} dir: {processed_dir}")

Removed train dir: /home/work/sleep/sleep_state_detection/input_small/processed_data/train


In [None]:
cfg.dir

{'data_dir': '/home/work/sleep/sleep_state_detection/input_small/child-mind-institute-detect-sleep-states', 'processed_dir': '/home/work/sleep/sleep_state_detection/input_small/processed_data', 'output_dir': '/home/work/sleep/sleep_state_detection/input_small/output', 'model_dir': '/home/work/sleep/sleep_state_detection/input_small/output/train', 'sub_dir': './'}

### Preprocess all series

In [None]:
with trace("Load series"):
    # scan parquet
    if cfg.phase in ["train", "test"]:
        series_lf = pl.scan_parquet(
            Path(cfg.dir.data_dir) / f"{cfg.phase}_series.parquet",
            low_memory=True,
        )
    elif cfg.phase == "dev":
        series_lf = pl.scan_parquet(
            Path(cfg.dir.processed_dir) / f"{cfg.phase}_series.parquet",
            low_memory=True,
        )
    else:
        raise ValueError(f"Invalid phase: {cfg.phase}")
    display('********** before preprocess********** ', series_lf.collect().head()) ################ First look 

    # preprocess
    series_df = (
        series_lf.with_columns(
            pl.col("timestamp").str.to_datetime("%Y-%m-%dT%H:%M:%S%z"),
            deg_to_rad(pl.col("anglez")).alias("anglez_rad"),
            (pl.col("anglez") - ANGLEZ_MEAN) / ANGLEZ_STD,
            (pl.col("enmo") - ENMO_MEAN) / ENMO_STD,
        )
        .select(
            [
                pl.col("series_id"),
                pl.col("anglez"),
                pl.col("enmo"),
                pl.col("timestamp"),
                pl.col("anglez_rad"),
            ]
        )
        .collect(streaming=True)
        .sort(by=["series_id", "timestamp"])
    )
    display('********** after preprocess********** ', series_df.head()) ################################ Second look
    n_unique = series_df.get_column("series_id").n_unique()

'********** before preprocess********** '

series_id,step,timestamp,anglez,enmo,__index_level_0__
str,u32,str,f32,f32,i64
"""038441c925bb""",0,"""2018-08-14T15:…",2.6367,0.0217,0
"""038441c925bb""",1,"""2018-08-14T15:…",2.6368,0.0215,1
"""038441c925bb""",2,"""2018-08-14T15:…",2.637,0.0216,2
"""038441c925bb""",3,"""2018-08-14T15:…",2.6368,0.0213,3
"""038441c925bb""",4,"""2018-08-14T15:…",2.6368,0.0215,4


'********** after preprocess********** '

series_id,anglez,enmo,timestamp,anglez_rad
str,f32,f32,"datetime[μs, UTC]",f32
"""038441c925bb""",0.322257,-0.192627,2018-08-14 19:30:00 UTC,0.046019
"""038441c925bb""",0.32226,-0.194591,2018-08-14 19:30:05 UTC,0.046021
"""038441c925bb""",0.322266,-0.193609,2018-08-14 19:30:10 UTC,0.046024
"""038441c925bb""",0.32226,-0.196555,2018-08-14 19:30:15 UTC,0.046021
"""038441c925bb""",0.32226,-0.194591,2018-08-14 19:30:20 UTC,0.046021


[0.6GB(-0.0GB):0.2sec] Load series 


### Add features and save each series separately

In [None]:
with trace("Save features"):
    for series_id, this_series_df in tqdm(series_df.group_by("series_id"), total=n_unique):
        # 特徴量を追加
        display('************** series before `add_feature`**************', this_series_df)
        this_series_df = add_feature(this_series_df)
        display('************** series after `add_feature`**************', this_series_df)
        # 特徴量をそれぞれnpyで保存
        series_dir = processed_dir / series_id  # type: ignore
        save_each_series(this_series_df, FEATURE_NAMES, series_dir)

  0%|                                                                                             | 0/1 [00:00<?, ?it/s]

'************** series before `add_feature`**************'

series_id,anglez,enmo,timestamp,anglez_rad
str,f32,f32,"datetime[μs, UTC]",f32
"""038441c925bb""",0.322257,-0.192627,2018-08-14 19:30:00 UTC,0.046019
"""038441c925bb""",0.32226,-0.194591,2018-08-14 19:30:05 UTC,0.046021
"""038441c925bb""",0.322266,-0.193609,2018-08-14 19:30:10 UTC,0.046024
"""038441c925bb""",0.32226,-0.196555,2018-08-14 19:30:15 UTC,0.046021
"""038441c925bb""",0.32226,-0.194591,2018-08-14 19:30:20 UTC,0.046021
"""038441c925bb""",0.322257,-0.192627,2018-08-14 19:30:25 UTC,0.046019
"""038441c925bb""",0.322257,-0.192627,2018-08-14 19:30:30 UTC,0.046019
"""038441c925bb""",0.322257,-0.191645,2018-08-14 19:30:35 UTC,0.046019
"""038441c925bb""",0.326798,-0.186735,2018-08-14 19:30:40 UTC,0.048834
"""038441c925bb""",0.334869,-0.192627,2018-08-14 19:30:45 UTC,0.053838


'************** series after `add_feature`**************'

series_id,anglez,enmo,step,hour_sin,hour_cos,month_sin,month_cos,minute_sin,minute_cos,anglez_sin,anglez_cos,anglez_diff,enmo_diff,anglez_diff_rolling_median,enmo_diff_rolling_median,anglez_diff_rolling_median_reverse,enmo_diff_rolling_median_reverse
str,f32,f32,u32,f64,f64,f64,f64,f64,f64,f32,f32,f32,f32,f32,f32,f32,f32
"""038441c925bb""",0.322257,-0.192627,0,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046003,0.998941,0.0,0.0,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.32226,-0.194591,1,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046005,0.998941,0.000003,-0.001964,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.322266,-0.193609,2,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046008,0.998941,0.000006,0.000982,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.32226,-0.196555,3,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046005,0.998941,-0.000006,-0.002946,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.32226,-0.194591,4,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046005,0.998941,0.0,0.001964,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.322257,-0.192627,5,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046003,0.998941,-0.000003,0.001964,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.322257,-0.192627,6,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046003,0.998941,0.0,0.0,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.322257,-0.191645,7,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.046003,0.998941,0.0,0.000982,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.326798,-0.186735,8,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.048815,0.998808,0.004541,0.00491,0.000999,-0.001618,0.000999,-0.001618
"""038441c925bb""",0.334869,-0.192627,9,-0.965926,0.258819,-0.866025,-0.5,5.6655e-16,-1.0,0.053812,0.998551,0.008071,-0.005892,0.000999,-0.001618,0.000999,-0.001618


100%|█████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  4.67it/s]
[0.6GB(+0.0GB):0.2sec] Save features 


### Processed data folder examination
 * One folder for each series_id
 * For each series_id folder, we have a separate numpy file for each feature

In [None]:
for x in processed_dir.ls(): 
    print('series_id: ', x.stem)
    display('Files for this series_id', [x.stem + x.suffix for x in x.ls()])
    display('*** numpy file for anglez ***', np.load(processed_dir/'038441c925bb'/'anglez.npy'))

series_id:  038441c925bb


'Files for this series_id'

['anglez.npy',
 'anglez_cos.npy',
 'anglez_diff.npy',
 'anglez_diff_rolling_median.npy',
 'anglez_diff_rolling_median_reverse.npy',
 'anglez_sin.npy',
 'enmo.npy',
 'enmo_diff.npy',
 'enmo_diff_rolling_median.npy',
 'enmo_diff_rolling_median_reverse.npy',
 'hour_cos.npy',
 'hour_sin.npy',
 'minute_cos.npy',
 'minute_sin.npy',
 'month_cos.npy',
 'month_sin.npy',
 'step.npy']

'*** numpy file for anglez ***'

array([ 0.32225707,  0.32225987,  0.32226554, ..., -0.52708995,
       -0.54031837, -0.55870426], dtype=float32)

## Main 

In [None]:
#|export 
#| notest
@hydra.main(config_path="conf", config_name="prepare_data", version_base="1.2")
def main(cfg: PrepareDataConfig):
    processed_dir: Path = Path(cfg.dir.processed_dir) / cfg.phase

    # ディレクトリが存在する場合は削除
    if processed_dir.exists():
        shutil.rmtree(processed_dir)
        print(f"Removed {cfg.phase} dir: {processed_dir}")

    with trace("Load series"):
        # scan parquet
        if cfg.phase in ["train", "test"]:
            series_lf = pl.scan_parquet(
                Path(cfg.dir.data_dir) / f"{cfg.phase}_series.parquet",
                low_memory=True,
            )
        elif cfg.phase == "dev":
            series_lf = pl.scan_parquet(
                Path(cfg.dir.processed_dir) / f"{cfg.phase}_series.parquet",
                low_memory=True,
            )
        else:
            raise ValueError(f"Invalid phase: {cfg.phase}")

        # preprocess
        series_df = (
            series_lf.with_columns(
                pl.col("timestamp").str.to_datetime("%Y-%m-%dT%H:%M:%S%z"),
                deg_to_rad(pl.col("anglez")).alias("anglez_rad"),
                (pl.col("anglez") - ANGLEZ_MEAN) / ANGLEZ_STD,
                (pl.col("enmo") - ENMO_MEAN) / ENMO_STD,
            )
            .select(
                [
                    pl.col("series_id"),
                    pl.col("anglez"),
                    pl.col("enmo"),
                    pl.col("timestamp"),
                    pl.col("anglez_rad"),
                ]
            )
            .collect(streaming=True)
            .sort(by=["series_id", "timestamp"])
        )
        n_unique = series_df.get_column("series_id").n_unique()
    with trace("Save features"):
        for series_id, this_series_df in tqdm(series_df.group_by("series_id"), total=n_unique):
            # 特徴量を追加
            this_series_df = add_feature(this_series_df)

            # 特徴量をそれぞれnpyで保存
            series_dir = processed_dir / series_id  # type: ignore
            save_each_series(this_series_df, FEATURE_NAMES, series_dir)


if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [--help] [--hydra-help] [--version]
                             [--cfg {job,hydra,all}] [--resolve]
                             [--package PACKAGE] [--run] [--multirun]
                             [--shell-completion] [--config-path CONFIG_PATH]
                             [--config-name CONFIG_NAME]
                             [--config-dir CONFIG_DIR]
                             [--experimental-rerun EXPERIMENTAL_RERUN]
                             [--info [{all,config,defaults,defaults-tree,plugins,searchpath}]]
                             [overrides ...]
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()