In [2]:
from typing import Dict, Optional, List
from preprocessor import Preprocessor
from collector import Collector
import pandas as pd

preprocessor = Preprocessor()
collector = Collector()

async def reset_collector():
    """Сбрасывает состояние коллектора, собирающего данные в онлайн режиме"""
    collector = Collector()

async def extract_features_online(new_data:Dict[str, Optional[float]])->Dict:
    """
    Извлекает фичи в онлайн режиме

    Parameters
    ----------
    new_data : Dict[str, Optional[float]]
        Словарь с новыми данными. Ожидаемые ключи:
            - 'bpm_s' (Optional[float]): Новое значение ЧСС (в ударах/минуту). Если None, данные не добавляются.
            - 'uc_s' (Optional[float]): Новое значение маточных сокращений (амплитуда). Если None, данные не добавляются.
            - 'time_sec' (float): Временная метка в секундах, для которой предоставляются данные.
    
    Returns
    -------
    Dict
        Словарь с вычисленными признаками. Содержит:
            - 'stv' (float): Среднее значение Short-Term Variability.
            - 'ltv' (float): Значение Long-Term Variability.
            - 'baseline_heart_rate' (float): Базовая ЧСС.

            - 'accelerations' (List[Dict[str, float]]): Список акселераций.
            - 'decelerations' (List[Dict[str, float]]): Список децелераций.
            - 'contractions' (List[Dict[str, float]]): Список маточных сокращений.

            - 'stvs' (List[float]): Массив значений STV для каждого временного окна.
            - 'stvs_window_duration' (float): Длительность окна для STV в секундах.
            - 'ltvs' (np.ndarray): Массив значений LTV для каждого временного окна.
            - 'ltvs_window_duration' (float): Длительность окна для LTV в секундах.
            
            - 'total_decelerations' (int): Общее количество децелераций.
            - 'late_decelerations' (int): Количество поздних децелераций.
            - 'late_deceleration_ratio' (float): Доля поздних децелераций (от общего числа).
            - 'total_accelerations' (int): Общее количество акселераций.
            - 'accel_decel_ratio' (float): Отношение количества акселераций к децелерациям.
            - 'total_contractions' (int): Общее количество маточных сокращений.

            - 'stv_trend' (float): Тренд STV за последнюю минуту (наклон линии тренда).
            - 'bpm_trend' (float): Тренд ЧСС за последние 5 минут (наклон линии тренда).

            - 'data_points' (int): Количество точек данных в ЧСС.
            - 'time_span_sec' (float): Общая длительность наблюдений в секундах (разница между последней и первой меткой времени).

            - 'filtered_bpm_batch' (Dict[str:List[float]]): Батч с отфильтрованной ЧСС
            - 'filtered_uterus_batch' (Dict[str:List[float]]): Батч с отфильтрованной активностью матки
    """

    collector.update_collector(new_data)
    bpm_raw, uterus_raw = collector.get_data()
    (bpm_filtered, uc_filtered) = preprocessor.filter_physiological_signals(bpm_raw, uterus_raw, fs_estimated=4.0)
    metrics = preprocessor.compute_metrics(bpm_filtered, uc_filtered)

    filtered_bpm_batch = bpm_filtered[- min(len(bpm_filtered), 51):]
    filtered_uterus_batch = uc_filtered[- min(len(uc_filtered), 51):]

    metrics = {**metrics,
               'filtered_bpm_batch': {
                   'time_sec': filtered_bpm_batch['time_sec'].to_list(),
                   'value': filtered_bpm_batch['value'].to_list(),
               },
               'filtered_uterus_batch': {
                   'time_sec': filtered_uterus_batch['time_sec'].to_list(),
                   'value': filtered_uterus_batch['value'].to_list(),
               },
               'stvs': list(metrics['stvs']),
               'ltvs': list(metrics['ltvs'])
               }
    return metrics

async def extract_features_offline(data:Dict[str, Dict[str, List[float]]])->Dict:
    """
    Извлекает фичи в онлайн режиме

    Parameters
    ----------
    data : Dict[str, Dict[str, List[float]]]
        Словарь с данными. Ожидаемые ключи:
            - 'bpm' (Dict[str, List[float]]): ЧСС в виде 'time_sec', 'value'
            - 'uterus' (Dict[str, List[float]]): Активность матки в виде 'time_sec', 'value'
    
    Returns
    -------
    Dict
        Словарь с вычисленными признаками. Содержит:
            - 'stv' (float): Среднее значение Short-Term Variability.
            - 'ltv' (float): Значение Long-Term Variability.
            - 'baseline_heart_rate' (float): Базовая ЧСС.

            - 'accelerations' (List[Dict[str, float]]): Список акселераций.
            - 'decelerations' (List[Dict[str, float]]): Список децелераций.
            - 'contractions' (List[Dict[str, float]]): Список маточных сокращений.

            - 'stvs' (List[float]): Массив значений STV для каждого временного окна.
            - 'stvs_window_duration' (float): Длительность окна для STV в секундах.
            - 'ltvs' (np.ndarray): Массив значений LTV для каждого временного окна.
            - 'ltvs_window_duration' (float): Длительность окна для LTV в секундах.
            
            - 'total_decelerations' (int): Общее количество децелераций.
            - 'late_decelerations' (int): Количество поздних децелераций.
            - 'late_deceleration_ratio' (float): Доля поздних децелераций (от общего числа).
            - 'total_accelerations' (int): Общее количество акселераций.
            - 'accel_decel_ratio' (float): Отношение количества акселераций к децелерациям.
            - 'total_contractions' (int): Общее количество маточных сокращений.

            - 'stv_trend' (float): Тренд STV за последнюю минуту (наклон линии тренда).
            - 'bpm_trend' (float): Тренд ЧСС за последние 5 минут (наклон линии тренда).

            - 'data_points' (int): Количество точек данных в ЧСС.
            - 'time_span_sec' (float): Общая длительность наблюдений в секундах (разница между последней и первой меткой времени).

            - 'filtered_bpm' (Dict[str:List[float]]): Отфильтрованная ЧСС
            - 'filtered_uterus' (Dict[str:List[float]]): Отфильтрованная активность матки
    """

    bpm_raw, uterus_raw = pd.DataFrame(data['bpm']), pd.DataFrame(data['uterus'])
    (bpm_filtered, uc_filtered) = preprocessor.filter_physiological_signals(bpm_raw, uterus_raw, fs_estimated=4.0)
    metrics = preprocessor.compute_metrics(bpm_filtered, uc_filtered, summarize=True)

    metrics = {**metrics,
               'filtered_bpm_batch': {
                   'time_sec': bpm_filtered['time_sec'].to_list(),
                   'value': bpm_filtered['value'].to_list(),
               },
               'filtered_uterus_batch': {
                   'time_sec': uc_filtered['time_sec'].to_list(),
                   'value': uc_filtered['value'].to_list(),
               },
               'stvs': list(metrics['stvs']),
               'ltvs': list(metrics['ltvs'])
               }
    return metrics

In [3]:
from random import uniform

time_sec = 0
def gen_data():
    global time_sec
    time_sec += uniform(0, 0.5)
    return {
        'bpm_s': 120.0 + uniform(-30, 30) if uniform(0,1)>0.5 else None,
        'uc_s': (50 + uniform(-30, 40)) if uniform(0,1)>0.5 else None,
        'time_sec': time_sec
    }

In [None]:
await reset_collector()

for i in range(100):
    new_data = gen_data()
    print(await extract_features_online(new_data))

In [None]:
collector.get_data()