# Synchronization

## Setup and Helper Functions

In [None]:
from pathlib import Path
import pandas as pd

from fau_colors import cmaps
from biopsykit.io.biopac import BiopacDataset
from empkins_io.sync import SyncedDataset

import matplotlib.pyplot as plt
import seaborn as sns

%matplotlib widget
%load_ext autoreload
%autoreload 2

In [None]:
plt.close("all")

palette = sns.color_palette(cmaps.faculties)
sns.set_theme(context="notebook", style="ticks", font="sans-serif", palette=palette)

plt.rcParams["figure.figsize"] = (8, 5)
plt.rcParams["pdf.fonttype"] = 42
plt.rcParams["mathtext.default"] = "regular"

palette

In [None]:
data_path = Path("data/sync_demo")
nilspod_sync = "9e02"
nilspod_ecg = "b0c2"

### Helper to load data

In [None]:
from typing import Optional, Tuple

from nilspodlib import Dataset, SyncedSession
from packaging.version import Version


class CustomSyncedSession(SyncedSession):
    """Custom SyncedSession to handle different firmware versions of the SyncPod.

    The SyncPod was using a custom dev version of the NilsPod firmware and needs to be loaded differently.
    """

    @classmethod
    def from_file_paths(cls, paths, legacy_support="resolve", force_version=None, tz="Europe/Berlin"):
        ds = []
        for p in paths:
            if "NilsPodX-9E02" in p.name:
                dataset = Dataset.from_bin_file(
                    p, legacy_support=legacy_support, force_version=Version("0.17.0"), tz=tz
                )
            else:
                dataset = Dataset.from_bin_file(p, legacy_support=legacy_support, force_version=force_version, tz=tz)

            ds.append(dataset)

        return cls(ds)

    @classmethod
    def from_folder_path(
        cls,
        base_path,
        filter_pattern: Optional[str] = "NilsPodX-*.bin",
        legacy_support: Optional[str] = "resolve",
        force_version=None,
        tz: Optional[str] = "Europe/Berlin",
    ):
        return cls.from_file_paths(
            sorted(base_path.glob(filter_pattern)), legacy_support=legacy_support, force_version=force_version, tz=tz
        )


def load_nilspod_data(type: str) -> Tuple[pd.Series, pd.Series]:
    """Load and return synchronization channel and ECG data from NilsPod."""
    nilspod_path = data_path.joinpath(type)
    session = CustomSyncedSession.from_folder_path(nilspod_path)
    session = session.align_to_syncregion()
    fs = session.ecg[1].sampling_rate_hz
    data = session.data_as_df(index="local_datetime", concat_df=True)
    # artifact in recording, TODO: rerecord data
    if type == "falling-trigger":
        data = data.loc[:"2023-11-20 11:07:18.8+01:00"]
    # constant filter delay in ECG sensor
    delay = 0.11261
    delay_periods = int(-delay/(1/fs))
    # see https://www.analog.com/media/en/technical-documentation/data-sheets/max30003.pdf, p. 19 for details
    ecg_data = data[nilspod_ecg].shift(periods=delay_periods)["ecg"]
    data = pd.concat([data[nilspod_sync]["analog_1"], ecg_data], axis=1)
    return data, fs


def load_biopac_data(type: str) -> Tuple[pd.Series, float]:
    """Load and return synchronization channel and ECG data from Biopac."""
    biopac_path = data_path.joinpath(type).joinpath(f"Sync_Board_Demo_{type}.acq")
    dataset = BiopacDataset.from_acq_file(biopac_path)
    data = dataset.data_as_df(index="local_datetime")
    if type == "falling-trigger":
        data = data.loc[:"2023-11-20 11:07:29+01:00"]
    return data, dataset.sampling_rate_hz_sync

## Plotting functions

In [None]:
def plot_cut_data(synced_dataset, type="ecg"):
    _plot_datastreams(synced_dataset.biopac_cut_, synced_dataset.nilspod_cut_, type)


def plot_aligned_data(synced_dataset, type="ecg"):
    _plot_datastreams(synced_dataset.biopac_aligned_, synced_dataset.nilspod_aligned_, type)


def _plot_datastreams(data_biopac, data_nilspod, type):
    fig, ax = plt.subplots(figsize=(8, 3))
    ax2 = ax.twinx()
    if type == "ecg":
        ax2.plot(data_nilspod.filter(like=type), color="g", label=f"nilspod {type}")
        ax.plot(data_biopac.filter(like=type), label=f"biopoac {type}")
        ax2.set_ylim(-5000, 3000)
    elif type == "sync":
        ax2.plot(data_nilspod.filter(like="analog_1"), color="g", label=f"nilspod {type}")
        ax.plot(data_biopac.filter(like=type), label=f"biopac {type}")
    else:
        raise ValueError("Unknown type!")
 
    fig.legend()
    fig.tight_layout()

## Trigger Synchronization

### Load data
Use custom method that is normally used to load your measurement data as `pandas.DataFrame`. Note that the sampling frequency of the sync channels is also required for the alignment.

In [None]:
np_data, np_fs = load_nilspod_data("falling-trigger")
bp_data, bp_fs = load_biopac_data("falling-trigger")

In [None]:
np_data

### Perform alignment
Create an instance of the `SyncedDataset` class provided by us and specify the sync signal type, as in this case, `falling-trigger`.
Add all your previously loaded datasets to the class and specify the name of the device, the column name of the synchronization channel, as well as the sampling rate of the device.

In [None]:
synced_dataset = SyncedDataset(sync_type="falling-trigger")
synced_dataset.add_dataset("nilspod", np_data, sync_channel_name="analog_1", sampling_rate=np_fs)
synced_dataset.add_dataset("biopac", bp_data, sync_channel_name="sync", sampling_rate=bp_fs)

Using the `plot_sync_channels` function, the data can be visualized before the alignment. As we can see in this example, there is an offset of around 10s between the timestamps of the individual devices that has to be corrected.

In [None]:
fig, axs = synced_dataset.plot_sync_channels(figsize=(8, 3))
fig.tight_layout()

Afterwards, the data is cut to the region of interest where a sync signal was present with a call of `cut_to_sync_region`. In the trigger signal case, this will be the region from the trigger time to the end of the recording. The result of this operation is stored as separate property for each of your devices as `<name_of_device>_cut_`.

In [None]:
synced_dataset.cut_to_sync_region()
synced_dataset.nilspod_cut_.head()

As last step, the alignment of the data to the timestamps of a reference is performed. To do so, the `align_datasets` function is called while specifying the name of the device whose timestamps should be used as reference for all other recordings. With `cut_to_shortest` set to `True`, both recordings are cut to the same length, otherwise, they won't be modified in length. The result of this operation and thus the final outcome of the postprocessing is stored as separate property for each of your devices as `<name_of_device>_aligned_`. This aligned data can then be used for the desired application.

In [None]:
synced_dataset.align_datasets("biopac", cut_to_shortest=True)
synced_dataset.nilspod_aligned_.head()

In [None]:
plot_cut_data(synced_dataset, type="ecg")

In [None]:
plot_aligned_data(synced_dataset, type="sync")

In [None]:
plot_aligned_data(synced_dataset, type="ecg")

## Edge Synchronization

### Load data
Use custom method that is normally used to load your measurement data as `pandas.DataFrame`. Note that the sampling frequency of the sync channels is also required for the alignment.

In [None]:
np_data, np_fs = load_nilspod_data("falling-edge")
bp_data, bp_fs = load_biopac_data("falling-edge")

### Perform alignment
Create an instance of the `SyncedDataset` class provided by us and specify the sync signal type, as in this case, `falling-edge`.
Add all your previously loaded datasets to the class and specify the name of the device, the column name of the synchronization channel, as well as the sampling rate of the device.

In [None]:
synced_dataset = SyncedDataset(sync_type="falling-edge")
synced_dataset.add_dataset("nilspod", np_data, sync_channel_name="analog_1", sampling_rate=np_fs)
synced_dataset.add_dataset("biopac", bp_data, sync_channel_name="sync", sampling_rate=bp_fs)

Using the `plot_sync_channels` function, the data can be visualized before the alignment. As we can see in this example, there is an offset of around 10s between the timestamps of the individual devices that has to be corrected.

In [None]:
fig, axs = synced_dataset.plot_sync_channels(figsize=(8, 3))
fig.tight_layout()

Afterwards, the data is cut to the region of interest where a sync signal was present with a call of `cut_to_sync_region`. The result of this operation is stored as separate property for each of your devices as `<name_of_device>_cut_`.

In [None]:
synced_dataset.cut_to_sync_region()
synced_dataset.nilspod_cut_.head()

As last step, the alignment of the data to the timestamps of a reference is performed. To do so, the `align_datasets` function is called while specifying the name of the device whose timestamps should be used as reference for all other recordings. The result of this operation and thus the final outcome of the postprocessing is stored as separate property for each of your devices as `<name_of_device>_aligned_`. This aligned data can then be used for the desired application.

In [None]:
synced_dataset.align_datasets("biopac")
synced_dataset.nilspod_aligned_.head()

In [None]:
plot_cut_data(synced_dataset, type="ecg")

In [None]:
plot_aligned_data(synced_dataset, type="sync")

In [None]:
plot_aligned_data(synced_dataset, type="ecg")

## Clock Synchronization

### Load data
Use custom method that is normally used to load your measurement data as `pandas.DataFrame`. Note that the sampling frequency of the sync channels is also required for the alignment.

In [None]:
np_data, np_fs = load_nilspod_data("falling-clock")
bp_data, bp_fs = load_biopac_data("falling-clock")

### Perform alignment
Create an instance of the `SyncedDataset` class provided by us and specify the sync signal type, as in this case, `falling-clock`.
Add all your previously loaded datasets to the class and specify the name of the device, the column name of the synchronization channel, as well as the sampling rate of the device.

In [None]:
synced_dataset = SyncedDataset(sync_type="falling-clock")
synced_dataset.add_dataset("nilspod", np_data, sync_channel_name="analog_1", sampling_rate=np_fs)
synced_dataset.add_dataset("biopac", bp_data, sync_channel_name="sync", sampling_rate=bp_fs)

Using the `plot_sync_channels` function, the data can be visualized before the alignment. As we can see in this example, there is an offset of around 10s between the timestamps of the individual devices that has to be corrected.

In [None]:
fig, axs = synced_dataset.plot_sync_channels(figsize=(8, 3))
fig.tight_layout()

Afterwards, the data is cut to the region of interest where a sync signal was present with a call of `cut_to_sync_region`. For clock signals, the frequency of the clock signal (in Hz) is specified as additional parameter `"wave_frequency"`. The result of this operation is stored as separate property for each of your devices as `<name_of_device>_cut_`.

In [None]:
synced_dataset.cut_to_sync_region(sync_params={"wave_frequency": 50})
synced_dataset.nilspod_cut_

As last step, the alignment of the data to the timestamps of a reference is performed. To do so, the `align_datasets` function is called while specifying the name of the device whose timestamps should be used as reference for all other recordings. The result of this operation and thus the final outcome of the postprocessing is stored as separate property for each of your devices as `<name_of_device>_aligned_`. This aligned data can then be used for the desired application.

In [None]:
synced_dataset.align_datasets("biopac", cut_to_shortest=False)
synced_dataset.nilspod_aligned_

In [None]:
plot_cut_data(synced_dataset, type="ecg")

In [None]:
plot_aligned_data(synced_dataset, type="sync")

In [None]:
plot_aligned_data(synced_dataset, type="ecg")

## M-Sequence Synchronization

### Load data
Use custom method that is normally used to load your measurement data as `pandas.DataFrame`. Note that the sampling frequency of the sync channels is also required for the alignment.

In [None]:
np_data, np_fs = load_nilspod_data("m-sequence")
bp_data, bp_fs = load_biopac_data("m-sequence")

### Perform alignment
Create an instance of the `SyncedDataset` class provided by us and specify the sync signal type, as in this case, `m-sequence`.
Add all your previously loaded datasets to the class and specify the name of the device, the column name of the synchronization channel, as well as the sampling rate of the device.

In [None]:
synced_dataset = SyncedDataset(sync_type="m-sequence")
synced_dataset.add_dataset("nilspod", np_data, sync_channel_name="analog_1", sampling_rate=np_fs)
synced_dataset.add_dataset("biopac", bp_data, sync_channel_name="sync", sampling_rate=bp_fs)

Using the `plot_sync_channels` function, the data can be visualized before the alignment. As we can see in this example, there is an offset of around 10s between the timestamps of the individual devices that has to be corrected.

In [None]:
fig, axs = synced_dataset.plot_sync_channels(figsize=(8, 3))
fig.tight_layout()

As for M-Sequence synchronization, the alignment is done based on the cross correlation of synchronization channels, the procedure slightly differs from the other utilized signals. If you are dealing with another signal, have a look at the trigger, edge, or clock example instead.

In the first step, all recordings need to be resampled to the same sampling frequency to make them comparable. For this purpose, the `resample_datasets` function is applied. To dynamically calculate the sampling frequency of a device (`method="dynamic"`), the frequency of the M-Sequence (in Hz) needs to be specified as additional parameter `wave_frequency`. Otherwise, the specified sampling frequencies from each devices can be used. In this case, the `method` is set to `"static"`. The result of this operation is stored as separate property for each of your devices as `<name_of_device>_resampled_`.

In [None]:
synced_dataset.resample_datasets(np_fs, method="static", wave_frequency=50)
synced_dataset.nilspod_resampled_

Afterwards, the data is aligned and cut to the region of interest where the sync signal was present with a call of `align_and_cut_to_m_sequence`. The device specified as `primary` is used as reference for all other recordings. The result of this operation and thus the final outcome of the postprocessing is stored as separate property for each of your devices as `<name_of_device>_aligned_`. This aligned data can then be used for the desired application.

In [None]:
synced_dataset.align_and_cut_m_sequence(primary="biopac", cut_to_shortest=True, reset_time_axis=True)
synced_dataset.nilspod_aligned_

In [None]:
plot_aligned_data(synced_dataset, type="sync")

In [None]:
plot_aligned_data(synced_dataset, type="ecg")