In [1]:
from cerebralcortex.util.helper_methods import get_study_names
# sn = get_study_names("/home/jupyter/cc3_conf/")
# print(sn)
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType,MapType, StringType,ArrayType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
ModuleMetadata
from typing import List
import numpy as np
from scipy import signal
import pandas as pd
from cerebralcortex import Kernel
from pyspark.sql import functions as F
CC = Kernel("/home/jupyter/cc3_moods_conf/", study_name='moods')

convert ecg data as collected from autosense into CC datastream for further processing

In [None]:
df = pd.read_csv('/home/jupyter/mullah/azim_ecg_test.csv.gz',
                 compression='gzip',names=['time','offset','ecg'])

from datetime import datetime
df['timestamp'] = df['time'].apply(lambda a:datetime.utcfromtimestamp(a/1000))

df['localtime'] = df.apply(lambda a:datetime.utcfromtimestamp((a['time']+a['offset'])/1000),axis=1)
df['user'] = ['afcfc1b5-365f-409b-918e-2f0ce8056ff9']*df.shape[0]
df['version'] = 1
df = df.sort_values('localtime').reset_index(drop=True)

stream_name = "ecg--org.md2k.autosense--autosense_chest--chest"
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("ECG Autosense") \
    .add_dataDescriptor(DataDescriptor().set_name("timestamp").set_type("datetime")) \
    .add_dataDescriptor(DataDescriptor().set_name("localtime").set_type("datetime")) \
    .add_dataDescriptor(DataDescriptor().set_name("version").set_type("int")) \
    .add_dataDescriptor(DataDescriptor().set_name("user").set_type("string")) \
    .add_dataDescriptor(DataDescriptor().set_name("time").set_type("double")) \
    .add_dataDescriptor(DataDescriptor().set_name("ecg").set_type("double")) \
    .add_dataDescriptor(DataDescriptor().set_name("offset").set_type("double")) 
stream_metadata.add_module(ModuleMetadata().set_name("ECG")
        .set_attribute("url", "http://md2k.org/")
        .set_author("Md Azim Ullah", "mullah@memphis.edu"))

ds = DataStream(data=df,metadata=stream_metadata)

CC.save_stream(ds,overwrite=True)

compute ecg quality every 3 second

In [None]:
def get_quality(data):
    outlier_threshold_high = 4000
    outlier_threshold_low = 20
    slope_threshold = 100
    eck_threshold_band_loose = 400
    minimum_expected_samples = 3*(0.33)*64
    data_quality_band_loose = 'Loose/Improper Attachment'
    data_quality_not_worn = 'Sensor off Body'
    data_quality_band_off = 'Battery down/Disconnected'
    data_quality_missing = 'Interittent Data Loss' 
    data_quality_good = 'Acceptable'
    acceptable_outlier_percent = 34
    if (len(data)== 0):
        return data_quality_band_off
    if (len(data)<=minimum_expected_samples) :
        return data_quality_missing
    range_data = max(data)-min(data)
    if range_data<=50:
        return data_quality_not_worn
    if range_data<=eck_threshold_band_loose:
        return data_quality_band_loose
    outlier_counts = 0 
    for i in range(0,len(data)):
        im,ip  = i,i
        if i==0:
            im = len(data)-1
        else:
            im = i-1
        if i == len(data)-1:
            ip = 0
        else:
            ip = ip+1
        stuck = ((data[i]==data[im]) and (data[i]==data[ip]))
        flip = ((abs(data[i]-data[im])>((int(outlier_threshold_high)))) or (abs(data[i]-data[ip])>((int(outlier_threshold_high)))))
        disc = ((abs(data[i]-data[im])>((int(slope_threshold)))) and (abs(data[i]-data[ip])>((int(slope_threshold)))))
        if disc:
            outlier_counts += 1
        elif stuck:
            outlier_counts +=1
        elif flip:
            outlier_counts +=1
        elif data[i] >= outlier_threshold_high:
            outlier_counts +=1
        elif data[i]<= outlier_threshold_low:
            outlier_counts +=1
    if (100*outlier_counts>acceptable_outlier_percent*len(data)):
        return data_quality_band_loose
    return data_quality_good

schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("localtime", TimestampType()),
    StructField("version", IntegerType()),
    StructField("user", StringType()),
    StructField("quality", StringType()),
    StructField("ecg", DoubleType())
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def ecg_quality(key,data):
    data['quality'] = ''
    if data.shape[0]>0:
        data = data.sort_values('timestamp')
        data['quality'] = get_quality(list(data['ecg']))
    return data

stream_name = 'org.md2k.autosense.ecg.quality'
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("Chest ECG quality 3 seconds") \
    .add_dataDescriptor(
    DataDescriptor().set_name("quality").set_type("string").set_attribute("description", \
    "ECG data quality").set_attribute('Loose/Improper Attachment','Electrode Displacement').set_attribute('Sensor off Body',\
    'Autosense not worn').set_attribute('Battery down/Disconnected', \
    'No data is present - Can be due to battery down or sensor disconnection').set_attribute('Interittent Data Loss', \
     'Not enough samples are present').set_attribute('Acceptable','Good Quality')) \
    .add_dataDescriptor(
    DataDescriptor().set_name("ecg").set_type("double").set_attribute("description", \
    "ecg sample value")) \
    .add_module(
    ModuleMetadata().set_name("autosense ecg quality").set_attribute("url", "http://md2k.org/").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
stream_metadata.is_valid()

ecg_stream = 'ecg--org.md2k.autosense--autosense_chest--chest'
ecg = CC.get_stream(ecg_stream).select('timestamp','localtime','user','version','ecg')
ecg_quality_stream = ecg.compute(ecg_quality,windowDuration=3,startTime='0 seconds')
ecg_quality_stream.printSchema()
data = ecg_quality_stream._data
ds = DataStream(data=data,metadata=stream_metadata)
CC.save_stream(ds,overwrite=True)

compute RR interval from acceptable portions of ECG 

In [None]:
import numpy as np
from scipy.stats import iqr
from enum import Enum


class Quality(Enum):
    ACCEPTABLE = 1
    UNACCEPTABLE = 0

def outlier_computation(valid_rr_interval_time: list,
                        valid_rr_interval_sample: list,
                        criterion_beat_difference: float):
    """
    This function implements the rr interval outlier calculation through comparison with the criterion
    beat difference and consecutive differences with the previous and next sample

    :param valid_rr_interval_time: A python array of rr interval time
    :param valid_rr_interval_sample: A python array of rr interval samples
    :param criterion_beat_difference: A threshold calculated from the RR interval data passed

    yields: The quality of each data point in the RR interval array
    """
    standard_rr_interval_sample = valid_rr_interval_sample[0]
    previous_rr_interval_quality = Quality.ACCEPTABLE

    for i in range(1, len(valid_rr_interval_sample) - 1):

        rr_interval_diff_with_last_good = abs(standard_rr_interval_sample - valid_rr_interval_sample[i])
        rr_interval_diff_with_prev_sample = abs(valid_rr_interval_sample[i - 1] - valid_rr_interval_sample[i])
        rr_interval_diff_with_next_sample = abs(valid_rr_interval_sample[i] - valid_rr_interval_sample[i + 1])

        if previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good < criterion_beat_difference:
            yield (valid_rr_interval_time[i], Quality.ACCEPTABLE)
            previous_rr_interval_quality = Quality.ACCEPTABLE
            standard_rr_interval_sample = valid_rr_interval_sample[i]

        elif previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good > criterion_beat_difference >= rr_interval_diff_with_prev_sample and rr_interval_diff_with_next_sample <= criterion_beat_difference:
            yield (valid_rr_interval_time[i], Quality.ACCEPTABLE)
            previous_rr_interval_quality = Quality.ACCEPTABLE
            standard_rr_interval_sample = valid_rr_interval_sample[i]

        elif previous_rr_interval_quality == Quality.UNACCEPTABLE and rr_interval_diff_with_last_good > criterion_beat_difference and (
                        rr_interval_diff_with_prev_sample > criterion_beat_difference or rr_interval_diff_with_next_sample > criterion_beat_difference):
            yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE)
            previous_rr_interval_quality = Quality.UNACCEPTABLE

        elif previous_rr_interval_quality == Quality.ACCEPTABLE and rr_interval_diff_with_prev_sample <= criterion_beat_difference:
            yield (valid_rr_interval_time[i], Quality.ACCEPTABLE)
            previous_rr_interval_quality = Quality.ACCEPTABLE
            standard_rr_interval_sample = valid_rr_interval_sample[i]

        elif previous_rr_interval_quality == Quality.ACCEPTABLE and rr_interval_diff_with_prev_sample > criterion_beat_difference:
            yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE)
            previous_rr_interval_quality = Quality.UNACCEPTABLE

        else:
            yield (valid_rr_interval_time[i], Quality.UNACCEPTABLE)


def compute_outlier_ecg(ecg_ts,ecg_rr):
    """
    Reference - Berntson, Gary G., et al. "An approach to artifact identification: Application to heart period data."
    Psychophysiology 27.5 (1990): 586-598.

    :param ecg_rr: RR interval datastream

    :return: An annotated datastream specifying when the ECG RR interval datastream is acceptable
    """


    valid_rr_interval_sample = [i for i in ecg_rr if i > .3 and i < 2]
    valid_rr_interval_time = [ecg_ts[i] for i in range(len(ecg_ts)) if ecg_rr[i] > .3 and ecg_rr[i] < 2]
    valid_rr_interval_difference = abs(np.diff(valid_rr_interval_sample))

    # Maximum Expected Difference(MED)= 3.32* Quartile Deviation
    maximum_expected_difference = 4.5 * 0.5 * iqr(valid_rr_interval_difference)

    # Shortest Expected Beat(SEB) = Median Beat â€“ 2.9 * Quartile Deviation
    # Minimal Artifact Difference(MAD) = SEB/ 3
    maximum_artifact_difference = (np.median(valid_rr_interval_sample) - 2.9 * .5 * iqr(
        valid_rr_interval_difference)) / 3

    # Midway between MED and MAD is considered
    criterion_beat_difference = (maximum_expected_difference + maximum_artifact_difference) / 2
    if criterion_beat_difference < .2:
        criterion_beat_difference = .2

    ecg_rr_quality_array = [(valid_rr_interval_time[0], Quality.ACCEPTABLE)]

    for data in outlier_computation(valid_rr_interval_time, valid_rr_interval_sample, criterion_beat_difference):
        ecg_rr_quality_array.append(data)
    ecg_rr_quality_array.append((valid_rr_interval_time[-1], Quality.ACCEPTABLE))
    return ecg_rr_quality_array

from typing import Tuple
from typing import List
import pandas as pd
import numpy as np

schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("localtime", TimestampType()),
    StructField("version", IntegerType()),
    StructField("user", StringType()),
    StructField("rr", FloatType())
])
from ecgdetectors import Detectors
detectors = Detectors(64)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def ecg_r_peak(key,data):
    if data.shape[0]>1000:
        data = data.sort_values('timestamp').reset_index(drop=True)
        index_all = np.array(list(range(data.shape[0])))
        rpeaks = detectors.hamilton_detector(data['ecg'].values)
        rpeaks = np.array(rpeaks)
        if len(rpeaks)<3:
            return pd.DataFrame([],columns=['timestamp','localtime','version','user','rr'])
        rpeak_ts = 1000*data['time'].values[rpeaks]
        ecg_rr_ts = rpeak_ts[1:]
        ecg_rr_val = np.diff(rpeak_ts)
        index_all = index_all[rpeaks][1:]
        
        index = np.where((ecg_rr_val>=400)&(ecg_rr_val<=2000))[0]
        if len(index)<3:
            return pd.DataFrame([],columns=['timestamp','localtime','version','user','rr'])
        
        ecg_rr_ts = ecg_rr_ts[index]
        ecg_rr_val = ecg_rr_val[index]
        index_all = index_all[index]
    
        outlier = compute_outlier_ecg(ecg_rr_ts/1000,ecg_rr_val/1000)
        index = []
        for ind,tup in enumerate(outlier):
            if tup[1]==Quality.ACCEPTABLE:
                index.append(ind)

        if len(index)<3:
            return pd.DataFrame([],columns=['timestamp','localtime','version','user','rr'])
        index = np.array(index)
        ecg_rr_ts = ecg_rr_ts[index]
        ecg_rr_val = ecg_rr_val[index]
        index_all = index_all[index]

        data = data.iloc[data.index[list(index_all)]]

        data['rr'] = list(np.float64(ecg_rr_val))

        data = data[['timestamp','localtime','version','user','rr']]
        return data
    else:
        return pd.DataFrame([],columns=['timestamp','localtime','version','user','rr'])



stream_name = 'org.md2k.autosense.ecg.rr.final.hamiltonian'
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("ECG RR interval in milliseconds") \
    .add_dataDescriptor(
    DataDescriptor().set_name("rr").set_type("float").set_attribute("description", \
    "rr interval")) \
    .add_module(
    ModuleMetadata().set_name("fourtytwo/mullah/cc3/ecg_rr.ipynb").set_attribute("url", \
    "http://md2k.org/").set_attribute('algorithm','pan-tomkins').set_attribute('unit', \
    'ms').set_author("Md Azim Ullah", "mullah@memphis.edu"))

ecg_stream = 'org.md2k.autosense.ecg.quality'
ecg11 = CC.get_stream(ecg_stream)
ecg11 = ecg11.withColumn('time',F.col('timestamp').cast('double'))
ecg12 = ecg11.filter(F.col('quality')=='Acceptable')
ecg_filtered11 = ecg12.compute(ecg_r_peak,windowDuration=600,startTime='0 seconds')
ecg_filtered11.metadata = stream_metadata
ecg_filtered11.show(5,False)
CC.save_stream(ecg_filtered11,overwrite=True)

Compute 5 second average

In [None]:
data = CC.get_stream('org.md2k.autosense.ecg.rr.final.hamiltonian')
win = F.window("timestamp", windowDuration='5 seconds',slideDuration='2 seconds',startTime='0 seconds')
groupbycols = ["user","version"] + [win]

schema2 = StructType([
    StructField("version", IntegerType()),
    StructField("user", StringType()),
    StructField("start", TimestampType()),
    StructField("end", TimestampType()),
    StructField("rr", DoubleType()),
    StructField("timestamp", TimestampType()),
    StructField("localtime", TimestampType())
])
@pandas_udf(schema2, PandasUDFType.GROUPED_MAP)
def compute_average(key,data1):
    temp = [data1.version.values[0],
           data1.user.values[0],
           key[2]['start'],
           key[2]['end'],
           data1.rr.mean(),
           data1.timestamp.values[0],
           data1.localtime.values[0]]
    return pd.DataFrame([temp],columns=['version','user','start','end','rr','timestamp','localtime'])
final_data = data._data.groupBy(groupbycols).apply(compute_average)
schema = final_data.schema
stream_metadata = Metadata()
stream_metadata.set_name("org.md2k.autosense.ecg.rr.final.hamiltonian.5secs.average").set_description("Bandpass Filtered PPG, ECG Rpeak")
for field in schema.fields:
    stream_metadata.add_dataDescriptor(
        DataDescriptor().set_name(str(field.name)).set_type(str(field.dataType))
    )
stream_metadata.add_module(
    ModuleMetadata().set_name("Bandpass Filtered PPG, ECG Rpeak") \
    .set_attribute("url", "https://md2k.org").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
stream_metadata.is_valid()
ds = DataStream(data=final_data,metadata=stream_metadata)
CC.save_stream(ds,overwrite=True)


compute 60 seconds average

In [2]:
data = CC.get_stream('org.md2k.autosense.ecg.rr.final.hamiltonian')
win = F.window("timestamp", windowDuration='60 seconds',slideDuration='60 seconds',startTime='0 seconds')
groupbycols = ["user","version"] + [win]

schema2 = StructType([
    StructField("version", IntegerType()),
    StructField("user", StringType()),
    StructField("start", TimestampType()),
    StructField("end", TimestampType()),
    StructField("ecg_rr", DoubleType()),
    StructField("timestamp", TimestampType()),
    StructField("localtime", TimestampType())
])
@pandas_udf(schema2, PandasUDFType.GROUPED_MAP)
def compute_average(key,data1):
    temp = [data1.version.values[0],
           data1.user.values[0],
           key[2]['start'],
           key[2]['end'],
           data1.rr.mean(),
           data1.timestamp.values[0],
           data1.localtime.values[0]]
    return pd.DataFrame([temp],columns=['version','user','start','end','ecg_rr','timestamp','localtime'])
final_data = data._data.groupBy(groupbycols).apply(compute_average)
schema = final_data.schema
stream_metadata = Metadata()
stream_metadata.set_name("org.md2k.autosense.ecg.rr.final.hamiltonian.60secs.average").set_description("Bandpass Filtered PPG, ECG Rpeak")
for field in schema.fields:
    stream_metadata.add_dataDescriptor(
        DataDescriptor().set_name(str(field.name)).set_type(str(field.dataType))
    )
stream_metadata.add_module(
    ModuleMetadata().set_name("Bandpass Filtered PPG, ECG Rpeak") \
    .set_attribute("url", "https://md2k.org").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
stream_metadata.is_valid()
ds = DataStream(data=final_data,metadata=stream_metadata)
CC.save_stream(ds,overwrite=True)

True