In [2]:
import time
import datetime
import os

import pandas as pd
import numpy as np
from tqdm import tqdm
import json

import plotly.express as px
import plotly.graph_objects as go

In [3]:
import Config
import Utils
DATA_FILE_PATH = Config.DATA_FILE_PATH_CONTINUOUS_EXTRACTED
OUTPUT_FILE_PATH = Config.DATA_FILE_PATH_CONTINUOUS_RESAMPLED
DTYPES = Config.EXTRACTED_DTYPE
BEGIN_TIME = Utils.timestr2time(Config.BEGIN_TIME)
END_TIME = Utils.timestr2time(Config.END_TIME)
DAY = datetime.timedelta(days=1)
HOUR = datetime.timedelta(hours=1)

In [16]:
df = pd.read_csv(f"WebApp/{Config.APP_DATA_FILE_PATH}/1683703843555/resampled/minute/temp.csv", encoding="utf-8")
df[df['record_time'].duplicated()]

Unnamed: 0,heart_rate,record_time
1081,0.0,1637013600000


In [19]:
# other
for file_name, dtype in DTYPES.items():
    if file_name in ['continuousrri', 'dailyworkoutdetail']:
        continue
    ex_id_list = [int(_.split('.')[0]) for _ in os.listdir(f"{DATA_FILE_PATH}/{file_name}")]
    ex_id_list.sort()
    try:
        os.mkdir(f"{OUTPUT_FILE_PATH}")
        os.mkdir(f"{OUTPUT_FILE_PATH}/{file_name}")
        os.mkdir(f"{OUTPUT_FILE_PATH}/{file_name}/missing_period")
    except:
        ...
    print(f'file_name: {file_name}')
    for ex_id in tqdm(ex_id_list):
        df = pd.read_csv(f"{DATA_FILE_PATH}/{file_name}/{ex_id}.csv", encoding="utf-8", dtype=str)
        df = df[dtype.keys()].astype(dtype).sort_values(by=['timestamp'])
        data_col = list(df)[0]
        df[f'{data_col}_raw'] = df[data_col]
        df = Utils.abnormal_processing(df, data_col, Utils.mask_over_3std, Utils.fill_nearest)

        df_missing_period = pd.DataFrame()
        df_resampled = pd.DataFrame()
        # print(f'ex_id: {ex_id}')
        for day in range((END_TIME - BEGIN_TIME).days):
            # 每天6:00~次日0:00
            begin_time_day = BEGIN_TIME + day * DAY + 6 * HOUR
            end_time_day = BEGIN_TIME + (day + 1) * DAY
            begin_timestamp_day = Utils.time2timestamp(begin_time_day)
            end_timestamp_day = Utils.time2timestamp(end_time_day)
            df_day = df[(begin_timestamp_day <= df['timestamp']) & (df['timestamp'] < end_timestamp_day)]
            df_head = pd.DataFrame({data_col: [0], "timestamp": [begin_timestamp_day], f'{data_col}_raw': [0]})
            df_tail = pd.DataFrame({data_col: [0], "timestamp": [end_timestamp_day], f'{data_col}_raw': [0]})
            df_day = pd.concat([df_head, df_day, df_tail], axis=0 ,ignore_index=True)

            # 重采样每天6:00~次日0:00的每一分钟，全部初始化为missing
            resampled_data = []
            for minute in range((end_time_day - begin_time_day).seconds // 60):
                timestamp = begin_timestamp_day + minute * 60 * 1000
                resampled_data.append({data_col: 0.0, 'timestamp': int(timestamp), 'missing': int(1)})
            df_day_resampled = pd.DataFrame(resampled_data)

            missing_period_list = []
            resampled_data = []
            for i in range(len(df_day) - 1):
                line = df_day.iloc[i]
                next_line = df_day.iloc[i + 1]
                period_length = next_line['timestamp'] - line['timestamp']
                # 两次采样时间间隔大于等于5分钟视为数据缺失时段
                if period_length > 5 * 60 * 1000:
                    missing_period_list.append({
                        'timestamp_start': int(line['timestamp']),
                        'timestamp_end': int(next_line['timestamp']),
                        'period_center': int((next_line['timestamp'] + line['timestamp']) / 2),
                        'period_length': int(period_length),
                    })
                    continue
                else:
                    
                    # 两次采样时间所在分钟
                    line_minute = int(line['timestamp'] / 60 / 1000)
                    next_line_minute = int(next_line['timestamp'] / 60 / 1000)
                    # 第二次采样时间为整分钟时视作上一分钟
                    if next_line['timestamp'] % 1000 == 0:
                        next_line_minute -= 1
                    # 两次采样时间在同一分钟内，不重采样
                    if period_length < 60 * 1000 - 1 and line_minute == next_line_minute:
                        continue
                    # 不在同一分钟内，线性插值的方式进行重采样
                    else:
                        # 第一次采样时间为整分钟时对该分钟进行一次重采样
                        if line['timestamp'] % 1000 == 0:
                            resampled_data.append({
                                data_col: line[data_col],
                                'timestamp': int(line['timestamp']),
                                'missing': int(0)
                            })
                        # 两次采样时间中的每一秒进行一次重采样
                        for j in range(next_line_minute - line_minute):
                            resample_timestamp = (line_minute + j + 1) * 60 * 1000
                            resampled_data.append({
                                data_col: Utils.linear_interpolation(
                                    resample_timestamp, 
                                    line['timestamp'], line[data_col], 
                                    next_line['timestamp'], next_line[data_col]),
                                'timestamp': int(resample_timestamp),
                                'missing': int(0)
                            })
            # 将初始化的数据中已经重采样过的去除，然后拼接上重采样的数据
            df_day_resampled2 = pd.DataFrame(resampled_data)
            if resampled_data:
                df_day_resampled = df_day_resampled[-df_day_resampled['timestamp'].isin(df_day_resampled2['timestamp'].to_list())]
            df_day_missing_period = pd.DataFrame(missing_period_list)
            df_day_resampled = pd.concat([df_day_resampled, df_day_resampled2], axis=0, ignore_index=True).sort_values(by=['timestamp']).drop_duplicates()
            
            df_missing_period = pd.concat([df_missing_period, df_day_missing_period], axis=0, ignore_index=True)
            df_resampled = pd.concat([df_resampled, df_day_resampled], axis=0, ignore_index=True)
        df_missing_period.to_csv(f"{OUTPUT_FILE_PATH}/{file_name}/missing_period/missing_period_{ex_id}.csv", encoding="utf-8", index=False)
        df_resampled.to_csv(f"{OUTPUT_FILE_PATH}/{file_name}/{ex_id}.csv", encoding="utf-8", index=False)

file_name: continuousbloodoxygensaturation


100%|██████████| 58/58 [00:29<00:00,  1.99it/s]


In [24]:
# continuousrri
file_name = 'continuousrri'
dtype = DTYPES[file_name]
ex_id_list = [int(_.split('.')[0]) for _ in os.listdir(f"{DATA_FILE_PATH}/{file_name}")]
ex_id_list.sort()
try:
    os.mkdir(f"{OUTPUT_FILE_PATH}")
    os.mkdir(f"{OUTPUT_FILE_PATH}/{file_name}")
    os.mkdir(f"{OUTPUT_FILE_PATH}/{file_name}/missing_period")
except:
    ...
for ex_id in ex_id_list:
    # if ex_id not in [6, 11, 27, 42, 50, 52]: continue
    df = pd.read_csv(f"{DATA_FILE_PATH}/{file_name}/{ex_id}.csv", encoding="utf-8", dtype=str)
    df = df[dtype.keys()].astype(dtype).sort_values(by=['timestamp'])
    df['rri_raw'] = df['rri']
    df = Utils.abnormal_processing(df, 'rri', Utils.mask_over_3std, Utils.fill_nearest)

    df_missing_period = pd.DataFrame()
    df_resampled = pd.DataFrame()
    print(f'ex_id: {ex_id}')
    for day in tqdm(range((END_TIME - BEGIN_TIME).days)):
        # 每天6:00~次日0:00
        begin_time_day = BEGIN_TIME + day * DAY + 6 * HOUR
        end_time_day = BEGIN_TIME + (day + 1) * DAY
        begin_timestamp_day = Utils.time2timestamp(begin_time_day)
        end_timestamp_day = Utils.time2timestamp(end_time_day)
        df_day = df[(begin_timestamp_day <= df['timestamp']) & (df['timestamp'] < end_timestamp_day)]
        df_head = pd.DataFrame({"rri": [0], "sqi": [0], "timestamp": [begin_timestamp_day], "rri_raw": [0]})
        df_tail = pd.DataFrame({"rri": [0], "sqi": [0], "timestamp": [end_timestamp_day], "rri_raw": [0]})
        # df_tail.columns = df_day.columns
        df_day = pd.concat([df_head, df_day, df_tail], axis=0 ,ignore_index=True)

        # 重采样每天6:00~次日0:00的每一秒，全部初始化为missing
        resampled_data = []
        for second in range((end_time_day - begin_time_day).seconds):
            timestamp = begin_timestamp_day + second * 1000
            resampled_data.append({'rri': 0, 'sqi': 0, 'timestamp': int(timestamp), 'missing': int(1)})
        df_day_resampled = pd.DataFrame(resampled_data)

        missing_period_list = []
        resampled_data = []
        for i in range(len(df_day) - 1):
            line = df_day.iloc[i]
            next_line = df_day.iloc[i + 1]
            period_length = next_line['timestamp'] - line['timestamp']
            rri_raw_sum = next_line['rri_raw'] + line['rri_raw']
            # 两次采样时间间隔大于等于3000毫秒视为数据缺失时段
            if period_length >= rri_raw_sum and period_length >= 3000:
                missing_period_list.append({
                    'timestamp_start': int(line['timestamp']),
                    'timestamp_end': int(next_line['timestamp']),
                    'period_center': int((next_line['timestamp'] + line['timestamp']) / 2),
                    'period_length': int(period_length),
                })
                continue
            else:
                # 两次采样时间所在秒数
                line_second = int(line['timestamp'] / 1000)
                next_line_second = int(next_line['timestamp'] / 1000)
                # 第二次采样时间为整秒时视作上一秒
                if (next_line['timestamp'] % 1000 == 0):
                    next_line_second -= 1
                # 两次采样时间在同一秒内，不重采样
                if period_length < 999 and line_second == next_line_second:
                    continue
                # 不在同一秒内，线性插值的方式进行重采样
                else:
                    # 第一次采样时间为整秒时对该秒进行一次重采样
                    if (line['timestamp'] % 1000 == 0):
                        resampled_data.append({
                            'rri': int(line['rri']),
                            'sqi': int(line['sqi']),
                            'timestamp': int(line['timestamp']),
                            'missing': int(0)
                        })
                    # 两次采样时间中的每一秒进行一次重采样
                    for j in range(next_line_second - line_second):
                        resample_timestamp = (line_second + j + 1) * 1000
                        resampled_data.append({
                            'rri': int(Utils.linear_interpolation(
                                resample_timestamp, 
                                line['timestamp'], line['rri'], 
                                next_line['timestamp'], next_line['rri'])),
                            'sqi': int(Utils.linear_interpolation(
                                resample_timestamp, 
                                line['timestamp'], line['sqi'], 
                                next_line['timestamp'], next_line['sqi'])),
                            'timestamp': int(resample_timestamp),
                            'missing': int(0)
                        })
        # 将初始化的数据中已经重采样过的去除，然后拼接上重采样的数据
        df_day_resampled2 = pd.DataFrame(resampled_data)
        if resampled_data:
            df_day_resampled = df_day_resampled[-df_day_resampled['timestamp'].isin(df_day_resampled2['timestamp'].to_list())]
        df_day_missing_period = pd.DataFrame(missing_period_list)
        df_day_resampled = pd.concat([df_day_resampled, df_day_resampled2], axis=0, ignore_index=True).sort_values(by=['timestamp']).drop_duplicates()
        
        df_missing_period = pd.concat([df_missing_period, df_day_missing_period], axis=0 ,ignore_index=True)
        df_resampled = pd.concat([df_resampled, df_day_resampled], axis=0 ,ignore_index=True)

    df_missing_period.to_csv(f"{OUTPUT_FILE_PATH}/{file_name}/missing_period/missing_period_{ex_id}.csv", encoding="utf-8", index=False)
    df_resampled.to_csv(f"{OUTPUT_FILE_PATH}/{file_name}/{ex_id}.csv", encoding="utf-8", index=False)


ex_id: 6


100%|██████████| 31/31 [04:52<00:00,  9.44s/it]


ex_id: 11


100%|██████████| 31/31 [01:48<00:00,  3.49s/it]


ex_id: 27


100%|██████████| 31/31 [05:04<00:00,  9.82s/it]


ex_id: 42


100%|██████████| 31/31 [03:18<00:00,  6.42s/it]


ex_id: 50


100%|██████████| 31/31 [01:19<00:00,  2.56s/it]


ex_id: 52


100%|██████████| 31/31 [03:35<00:00,  6.96s/it]


In [35]:
1 in temp_df[0:64800]['missing'].unique()

True

In [10]:
df_resampled_trans = df_resampled.copy()
df_resampled_trans['time'] = df_resampled['timestamp'].apply(Utils.timestamp2time)

KeyError: "['timestamp'] not found in axis"

In [12]:
df_resampled_trans = df_resampled_trans.drop('timestamp', axis=1)
df_resampled_trans

Unnamed: 0,rri,sqi,missing,time
0,0.0,0.0,1,2021-11-15 06:00:00.000000
1,0.0,0.0,1,2021-11-15 06:00:01.000000
2,0.0,0.0,1,2021-11-15 06:00:02.000000
3,0.0,0.0,1,2021-11-15 06:00:03.000000
4,0.0,0.0,1,2021-11-15 06:00:04.000000
...,...,...,...,...
64795,0.0,0.0,1,2021-11-15 23:59:55.000000
64796,0.0,0.0,1,2021-11-15 23:59:56.000000
64797,0.0,0.0,1,2021-11-15 23:59:57.000000
64798,0.0,0.0,1,2021-11-15 23:59:58.000000


In [13]:
df_resampled_trans.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 64800 entries, 0 to 64799
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype  
---  ------   --------------  -----  
 0   rri      64800 non-null  float64
 1   sqi      64800 non-null  float64
 2   missing  64800 non-null  int64  
 3   time     64800 non-null  object 
dtypes: float64(2), int64(1), object(1)
memory usage: 2.0+ MB


In [19]:
fig = px.scatter(df_resampled_trans[:32400], x='time', y='rri', color='sqi', color_continuous_scale=px.colors.sequential.Agsunset)
fig.show()

In [21]:
fig = px.line(df_resampled_trans[10000:20000], x='time', y='rri')
fig.show()

In [25]:
fig = go.Figure(data=go.Scattergl(
    x = df['timestamp'],
    y = df['rri'],
    mode='markers',
    name='rri',
    opacity=0.5,
))

fig.add_trace(
    go.Bar(
        x=df_missing_period['period_center'],
        y=[4000 for _ in range(len(df_missing_period))],
        width=df_missing_period['period_length'],
        opacity=0.5,
        name='missing'
    )
)
fig.show()