# ParaDigMa Pipeline Orchestrator Tutorial

This tutorial demonstrates how to use the **pipeline orchestrator** `run_paradigma()`, which serves as the main entry point for running ParaDigMa analysis pipelines. The orchestrator coordinates multiple analysis steps and can process different formats of sensor data.

## Overview

The `run_paradigma()` function is called an _orchestrator_ because it coordinates multiple analysis steps depending on the user input. It can process:

- **Gait analysis**: Arm swing quantification from IMU data
- **Tremor analysis**: Tremor detection and quantification from gyroscope data  
- **Pulse rate estimation**: Pulse rate analysis from PPG data

### Key Features

- **Multi-pipeline support**: Run multiple analyses simultaneously
- **Flexible data input**: Works with both prepared and raw sensor data
- **Multiple data formats**: Supports Verily, Axivity, Empatica, and custom formats
- **Robust processing**: Automatic data preparation and error handling

### Data Requirements

The orchestrator accepts either:
1. **Prepared data**: Prepared according to the [Data preparation tutorial](https://biomarkersparkinson.github.io/paradigma/tutorials/data_preparation.html)
2. **Raw data**: Automatically processed (note: this feature has a limited scope)

Let's explore different usage scenarios with examples.

### Important required modules

In [None]:
from pathlib import Path

from paradigma.constants import TimeUnit
from paradigma.load import load_data_files
from paradigma.orchestrator import run_paradigma


## 1. Single pipeline with prepared data

Let's start with a simple example using prepared PPG data for pulse rate analysis. 

The function `load_data_files` attempts to load data of any or multiple of the following formats: 
'parquet', 'csv', 'pkl', 'pickle', 'json', 'avro', 'cwa'. You can load the data in your preferred 
ways, but note that the output should be of format `Dict[str, pd.DataFrame]`:
```python
{
    'file_1': df_1, 
    'file_2': df_2, 
    ..., 
    'file_n': df_n
}
```

Alternatively, you can provide:
- A **single DataFrame**: Will be processed with key `'segment_1'`
- A **list of DataFrames**: Each will get keys like `'segment_1'`, `'segment_2'`, etc.

This means ParaDigMa can run multiple files in sequence. This is useful when you have multiple files
spanning a week, and you want aggregations to be computed across all files.

In [None]:
path_to_ppg_data = Path('../../example_data/verily/ppg')

dfs_ppg = load_data_files(
    data_path=path_to_ppg_data,
    file_patterns='json',
    verbosity=0 
)

print(f"Loaded {len(dfs_ppg)} PPG files:")
for filename in dfs_ppg.keys():
    df = dfs_ppg[filename]
    print(f"  - {filename}: {len(df)} samples, {len(df.columns)} columns")

print(f"\nFirst 5 rows of {list(dfs_ppg.keys())[0]}:")
dfs_ppg[list(dfs_ppg.keys())[0]].head()

### Output Control

When running ParaDigMa, you can control where results are saved and what intermediate results to store:

**Output Directory:**
- Default: `output_dir` defaults to `"./output"` 
- Custom: Specify your own path like `output_dir="./my_results"`
- No storage: Files are only saved if `store_intermediate` is not empty

**Store Intermediate Results:**

The `store_intermediate` parameter accepts a list of strings:
```python
store_intermediate=['preprocessing', 'quantification', 'aggregation']
```

Valid options are:
- `'preparation'`: Save prepared data
- `'preprocessing'`: Save preprocessed signals
- `'classification'`: Save classification results
- `'quantification'`: Save quantified measures
- `'aggregation'`: Save aggregated results

If `store_intermediate=[]` (empty list), **no files are saved** - results are only returned in memory.

Also, set the correct units of the `time` column. For all options, please check [the API reference](https://biomarkersparkinson.github.io/paradigma/autoapi/paradigma/constants/index.html#paradigma.constants.TimeUnit).

In [None]:
pipeline = 'pulse_rate'

# Example 1: Using default output directory with storage
results_single_pipeline = run_paradigma(
    dfs=dfs_ppg,
    pipeline_names=pipeline,
    data_prepared=True,
    time_input_unit=TimeUnit.RELATIVE_S,
    store_intermediate=['quantification', 'aggregation'],  # Files saved to ./output
    verbosity=1 
)

print(results_single_pipeline['metadata'][pipeline])
print(results_single_pipeline['aggregations'][pipeline])
results_single_pipeline['quantifications'][pipeline].head()

In [None]:
# Example 2: No file storage - results only in memory
results_no_storage = run_paradigma(
    dfs=dfs_ppg,
    pipeline_names=pipeline,
    data_prepared=True,
    time_input_unit=TimeUnit.RELATIVE_S,
    store_intermediate=[],  # No files saved
    verbosity=1 
)

print("Results returned without file storage:")
print(f"  Quantifications: {len(results_no_storage['quantifications'][pipeline])} rows")
print(f"  Aggregations: {results_no_storage['aggregations'][pipeline]}")

### Example: No File Storage

If you only want to work with results in memory without saving any files, use an empty `store_intermediate` list:

Note that `run_paradigma` currently does not accept accelerometer data as a supplement to the pulse
rate pipeline for signal quality analysis. If you want to do these analyses, please check out the
[Pulse rate analysis](https://biomarkersparkinson.github.io/paradigma/tutorials/_static/pulse_rate_analysis.html)
tutorial for more info.

## 2. Multi-pipeline with prepared data

One of the key features of the orchestrator is the ability to run multiple analysis pipelines simultaneously on the same data. This is more efficient than running them separately.

### Results Structure

The multi-pipeline orchestrator returns a nested structure that organizes results by pipeline:

```python
{
    'quantifications': {
        'gait': DataFrame,      # Gait segment-level quantifications
        'tremor': DataFrame     # Tremor window-level quantifications
    },
    'aggregations': {
        'gait': {...},         # Aggregated gait metrics
        'tremor': {...}        # Aggregated tremor metrics  
    },
    'metadata': {
        'gait': {...},         # Gait analysis metadata
        'tremor': {...}        # Tremor analysis metadata
    }
}
```

In [None]:
# Load prepared IMU data
path_to_imu_data = Path('../../example_data/verily/imu')

dfs_imu = load_data_files(
    data_path=path_to_imu_data,
    file_patterns='json',
    verbosity=0 
)

print(f"Loaded {len(dfs_imu)} IMU files:")
for filename in dfs_imu.keys():
    df = dfs_imu[filename]
    print(f"  - {filename}: {len(df)} samples, {len(df.columns)} columns")

print(f"\nFirst 5 rows of {list(dfs_imu.keys())[0]}:")
dfs_imu[list(dfs_imu.keys())[0]].head()

In [None]:
# Run gait and tremor analysis on the prepared data
# Using custom output directory
results_multi_pipeline = run_paradigma(
    output_dir=Path('./output_multi'),
    dfs=dfs_imu,                        # Pre-loaded data
    data_prepared=True,                 # Data is already prepared
    pipeline_names=['gait', 'tremor'],  # Multiple pipelines (list format)
    watch_side='left',                  # Required for gait analysis
    store_intermediate=['quantification'],  # Store quantifications only
    verbosity=1
)

In [None]:
results_multi_pipeline.keys()

In [None]:
# Explore the results structure
print("Detailed Results Analysis:")

# Gait results
arm_swing_quantified = results_multi_pipeline['quantifications']['gait']
arm_swing_aggregates = results_multi_pipeline['aggregations']['gait']
arm_swing_meta = results_multi_pipeline['metadata']['gait']
print(f"\nArm swing quantification ({len(arm_swing_quantified)} windows):")
print(f"   Columns: {list(arm_swing_quantified.columns[:5])}... ({len(arm_swing_quantified.columns)} total)")
print(f"   Files: {arm_swing_quantified['file_key'].unique()}")

print(f"\nArm swing aggregation ({len(arm_swing_aggregates)} time ranges):")
print(f"   Gait segment categories: {list(arm_swing_aggregates.keys())}")
print(f"   Aggregates: {list(arm_swing_aggregates['0_10'].keys())}")
print(f"   Metadata first gait segment: {arm_swing_meta[1]}")

# Tremor results  
tremor_quantified = results_multi_pipeline['quantifications']['tremor']
tremor_aggregates = results_multi_pipeline['aggregations']['tremor']
tremor_meta = results_multi_pipeline['metadata']['tremor']
print(f"\nTremor quantification ({len(tremor_quantified)} windows):")
print(f"   Columns: {list(tremor_quantified.columns[:5])}... ({len(tremor_quantified.columns)} total)")
print(f"   Files: {tremor_quantified['file_key'].unique()}")

print(f"\nTremor aggregation ({len(tremor_aggregates)} time ranges):")
print(f"   Aggregates: {list(tremor_aggregates.keys())}")
print(f"   Metadata first tremor segment: {tremor_meta}")

## 3. Raw Data Processing

The orchestrator can also process raw sensor data automatically. This includes data preparation steps like format standardization, unit conversion, and orientation correction. Note that this feature has been developed on limited data examples, and therefore may not function as expected on newly observed data.

In [None]:
path_to_raw_data = Path('../../example_data/axivity')

device_orientation = ["-x", "-y", "z"]      # Sensor was worn upside-down
pipeline = 'gait'

# Working with raw data - this requires data preparation
# Using custom output directory
results_end_to_end = run_paradigma(
    output_dir=Path('./output_raw'),
    data_path=path_to_raw_data,             # Point to data folder
    data_prepared=False,                    # ParaDigMa will prepare the data
    pipeline_names=pipeline,
    watch_side="left",
    time_input_unit=TimeUnit.RELATIVE_S,    # Specify time unit for raw data
    accelerometer_units='g',
    gyroscope_units='deg/s',
    target_frequency=100.0,
    device_orientation=device_orientation,
    store_intermediate=['aggregation'],     # Only save aggregations
    verbosity=1,
)

print(results_end_to_end['metadata'][pipeline][1])
print(results_end_to_end['aggregations'][pipeline])
results_end_to_end['quantifications'][pipeline].head()