In [1]:
from cerebralcortex.util.helper_methods import get_study_names
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType,MapType, StringType,ArrayType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
import numpy as np
import pandas as pd
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
ModuleMetadata
from typing import List
import numpy as np
from scipy import signal
import pandas as pd
import pickle
from cerebralcortex import Kernel
from pyspark.sql import functions as F
CC = Kernel("/home/jupyter/cc3_conf/", study_name='mperf')

Calculate PPG data Quality - DC level

In [2]:
def dc_quality_motionsensehrv(data,wrist = 'left'):
    def isDatapointsWithinRange(red,infrared,green):
        red = np.asarray(red, dtype=np.float32)
        infrared = np.asarray(infrared, dtype=np.float32)
        green = np.asarray(green, dtype=np.float32)
        a =  len(np.where((red >= 30000)& (red<=200000))[0]) < .33*3*25
        b = len(np.where((infrared >= 110000)& (infrared<=230000))[0]) < .33*3*25
        c = len(np.where((green >= 500)& (green<=20000))[0]) < .33*3*25
        if a and b and c:
            return False
        return True
    def compute_quality(red,infrared,green):
        """
        :param window: a window containing list of DataPoints
        :return: an integer reptresenting the status of the window 0= attached, 1 = not attached
        """
        if len(red)==0:
            return 1 #not attached
        if not isDatapointsWithinRange(red,infrared,green):
            return 1
        red_mean = np.mean(red)
        ir_mean = np.mean(infrared)
        green_mean = np.mean(green)
        if red_mean < 5000 and ir_mean < 5000:
            return 1
        if not (red_mean>green_mean and ir_mean>red_mean):
            return 1
        diff = 30000
        if red_mean<130000:
            diff = 10000
        if not red_mean - green_mean > diff:
            return 1
        if not ir_mean - red_mean >diff:
            return 1
        return 0
    schema = StructType([
        StructField("version", IntegerType()),
        StructField("user", StringType()),
        StructField("localtime", TimestampType()),
        StructField("timestamp", TimestampType()),
        StructField("red", FloatType()),
        StructField("infrared", FloatType()),
        StructField("green", FloatType()),
        StructField("aclx", FloatType()),
        StructField("acly", FloatType()),
        StructField("aclz", FloatType()),
        StructField("gyrox", FloatType()),
        StructField("gyroy", FloatType()),
        StructField("gyroz", FloatType()),
        StructField("quality", IntegerType())
    ])
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ppg_quality(key,data):
        data = data.sort_values('time').reset_index(drop=True)
        data['quality'] = compute_quality(data['red'].values,
                                          data['infrared'].values,
                                          data['green'].values)
        data.drop(columns=['time'],inplace=True)
        return data

    data = data.withColumn('time',data.timestamp.cast('double'))
    win = F.window("timestamp", windowDuration='3 seconds', startTime='0 seconds')
    ppg_quality_stream = data._data.groupBy(['user','version']+[win]).apply(ppg_quality)
    
#     ppg_quality_stream.show(10,False)
    stream_name = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.quality.all'
    stream_metadata = Metadata()
    stream_metadata.set_name(stream_name).set_description("Sequence Aligment,Timestamp Correction, Decoding & Quality of MotionsenseHRV") \
        .add_dataDescriptor(
        DataDescriptor().set_name("red").set_type("float").set_attribute("description", \
        "Value of Red LED - PPG")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("infrared").set_type("float").set_attribute("description", \
        "Value of Infrared LED - PPG")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("green").set_type("float").set_attribute("description", \
        "Value of Green LED - PPG")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("aclx").set_type("float").set_attribute("description", \
        "Wrist Accelerometer X-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("acly").set_type("float").set_attribute("description", \
        "Wrist Accelerometer Y-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("aclz").set_type("float").set_attribute("description", \
        "Wrist Accelerometer Z-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("gyrox").set_type("float").set_attribute("description", \
        "Wrist Gyroscope X-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("gyroy").set_type("float").set_attribute("description", \
        "Wrist Gyroscope Y-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("gyroz").set_type("float").set_attribute("description", \
        "Wrist Gyroscope Z-axis")) \
        .add_dataDescriptor( \
        DataDescriptor().set_name("quality").set_type("integer").set_attribute("description", \
        "PPG quality")) \
        .add_module( \
        ModuleMetadata().set_name("fourtytwo/mullah/cc3/motionsenseHRVquality.ipynb").set_attribute("url", "https://md2k.org").set_author(
            "Md Azim Ullah", "mullah@memphis.edu"))
    ppg_quality_stream = DataStream(data=ppg_quality_stream,metadata=stream_metadata)
    return ppg_quality_stream

In [None]:
wrist = 'left'
ppg_stream = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.all'
data = CC.get_stream(ppg_stream)
users = data.groupBy('user').count().sort('count')
all_users = users.toPandas()
import pickle
pickle.dump(all_users,open('../data/users_mperf.p','wb'))

In [None]:
users = pickle.load(open('../data/users_mperf.p','rb'))
wrist = 'left'
ppg_stream = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.all'
check = False
for i,user in enumerate(users['user']):
    try:
        data = CC.get_stream(ppg_stream,user_id=user)
        data_quality = dc_quality_motionsensehrv(data,wrist='left')
        if not check:
            CC.save_stream(data_quality,overwrite=True)
            print(i)
            check = True
        else:
            CC.save_stream(data_quality,overwrite=False)
        print(user,'done',i)
    except Exception as e:
        print('error',e)


It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.



0
f2d36ddc-d20e-46c7-974c-fb7fc273db31 done 0
784af6b8-2218-4e01-a066-b66eb2832d9d done 1
351fbcd3-c1ec-416c-bed7-195fe5d1f41d done 2
24e24816-b56b-4c16-9e6d-0ae8afc8650f done 3
af8c78cf-7d82-44f0-a277-abe61896b015 done 4
26eeb04e-ab64-45b2-bb12-7aff7a779f9a done 5
58e04a3a-1f6d-453c-9250-5b218ba2e99c done 6
f16d63b1-c63a-42e0-80c9-aa18ec513da3 done 7
ec7bb904-196c-4040-997e-d5e23ad1a553 done 8
6b74bbc2-128a-4206-82e1-eaec79579c51 done 9
3b6fda64-bb3f-4a77-bec0-7ba034d4540e done 10
1b46a6a9-0e7d-4740-b101-b5bc28154f2c done 11
efc3b444-ae8b-4106-8cfe-689e0cb4dbb9 done 12
3b4966d8-f38a-46cc-ba12-4f059b753241 done 13
6cd16a05-3496-4c8e-a915-b44ac950e241 done 14
f1a772e9-bf5f-4bc9-96ea-7a45f38c8c41 done 15
c9b4e8d8-34f7-474b-9233-db1d945d8fa0 done 16
95a070a7-086b-4f3d-a5ba-0a877f7fabf7 done 17
e1af82ce-6892-4ecf-9f88-be2dd624c100 done 18
1b524925-07d8-42ea-8876-ada7298369ec done 19
b4b75916-a561-41e4-b3b9-5dfc2859028d done 20
8b73cf24-6579-4a9f-b7db-62317feb4d58 done 21
c81cafe4-2589-4207

In [None]:
wrist = 'left'
ppg_stream = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.all'
data = CC.get_stream(ppg_stream)
data_quality = dc_quality_motionsensehrv(data,wrist='left')
CC.save_stream(data_quality,overwrite=True)

Bandpass Filtering

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType, IntegerType
import numpy as np
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
    ModuleMetadata
from scipy import signal
def filter_data(X,
                Fs=25,
                low_cutoff=.4,
                high_cutoff=3.0,
                filter_order=65):
    """
    Bandpass Filter of single channel

    :param X: input data
    :param Fs: sampling freq.
    :param low_cutoff: low passband
    :param high_cutoff: high passband
    :param filter_order: no of taps in FIR filter

    :return: filtered version of input data
    """
    X1 = X.reshape(-1,1)
    X1 = signal.detrend(X1,axis=0,type='constant')
    b = signal.firls(filter_order,np.array([0,low_cutoff-.1, low_cutoff, high_cutoff ,high_cutoff+.5,Fs/2]),np.array([0, 0 ,1 ,1 ,0, 0]),
                     np.array([100*0.02,0.02,0.02]),fs=Fs)
    X2 = signal.convolve(X1.reshape(-1),b,mode='same')
    return X2

def get_metadata(data,
                 wrist='left',
                 sensor_name='motionsensehrv',
                 ppg_columns=('red','infrared','green'),
                 acl_columns=('aclx','acly','aclz')):
    """
    :param data: input stream
    :param wrist: which wrist the data was collected from
    :param sensor_name: name of sensor
    :param ppg_columns: columns in the input dataframe referring to multiple ppg channels
    :param acl_columns: columns in the input dataframe referring to accelerometer channels

    :return: metadata of output stream
    """
    stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.bandpass.filtered"
    stream_metadata = Metadata()
    stream_metadata.set_name(stream_name).set_description("Bandpass Filtered PPG data") \
        .add_dataDescriptor(DataDescriptor().set_name("timestamp").set_type("datetime")) \
        .add_dataDescriptor(DataDescriptor().set_name("localtime").set_type("datetime")) \
        .add_dataDescriptor(DataDescriptor().set_name("version").set_type("int")) \
        .add_dataDescriptor(DataDescriptor().set_name("user").set_type("string"))

    for c in ppg_columns:
        stream_metadata.add_dataDescriptor(DataDescriptor().set_name(c).set_type("double").set_attribute("description",
                                                                                                    "ppg channel "+c))
    for c in acl_columns:
        stream_metadata.add_dataDescriptor(DataDescriptor().set_name(c).set_type("double").set_attribute("description",
                                                                                            "accelerometer channel "+c))

    stream_metadata.add_module(
        ModuleMetadata().set_name("ecg data quality").set_attribute("url", "http://md2k.org/").set_author(
            "Md Azim Ullah", "mullah@memphis.edu"))
    return stream_metadata


def bandpass_filter(
                   data,
                   Fs = 25,
                   low_cutoff = 0.4,
                   high_cutoff = 3.0,
                   filter_order = 65,
                   ppg_columns=('red','infrared','green'),
                   acl_columns=('aclx','acly','aclz'),
                   wrist='left',
                   sensor_name='motionsensehrv'):

    """

    :param data: PPG & ACL data stream
    :param Fs: sampling frequency
    :param low_cutoff: minimum frequency of pass band
    :param high_cutoff: Maximum Frequency of pass band
    :param filter_order: no. of taps in FIR filter
    :param ppg_columns: columns in the input dataframe referring to multiple ppg channels
    :param acl_columns: columns in the input dataframe referring to accelerometer channels
    :param wrist: which wrist the data was collected from
    :param sensor_name: name of sensor

    :return: Bandpass filtered version of input PPG data
    """

    ## check if all columns exist

    default_columns = ['user','version','localtime','timestamp']
    required_columns = default_columns+acl_columns+ppg_columns
    if len(set(required_columns)-set(data.columns))>0:
        raise Exception("Columns missing in input dataframe! " + str(list(set(required_columns)-set(data.columns))))

    ## select the columns from input dataframe

    data = data.select(*required_columns)

    ## udf

    default_schema = [StructField("timestamp", TimestampType()),
                      StructField("localtime", TimestampType()),
                      StructField("version", IntegerType()),
                      StructField("user", StringType())]
    schema = StructType(default_schema+[StructField(c, DoubleType()) for c in list(ppg_columns)+list(acl_columns)])
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ppg_bandpass(data):
        if data.shape[0]<1000:
            return pd.DataFrame([],columns=data.columns)
        data = data.sort_values('timestamp').reset_index(drop=True)
        for c in ppg_columns:
            data[c] = filter_data(data[c].values,Fs=Fs,low_cutoff=low_cutoff,high_cutoff=high_cutoff,filter_order=filter_order)
        return data

    ## steps
    win = F.window("timestamp", windowDuration='3600 seconds', startTime='0 seconds')
    ppg_bandpass_filtered = data._data.groupBy(['user','version']+[win]).apply(ppg_bandpass)
#     ppg_bandpass_filtered = data.compute(ppg_bandpass,windowDuration=60*60*10,startTime='0 seconds')
    output_data = ppg_bandpass_filtered
    ds = DataStream(data=output_data,metadata=get_metadata(data,wrist=wrist,sensor_name=sensor_name,
                                                           ppg_columns=ppg_columns,acl_columns=acl_columns))
    return ds

In [None]:
wrist = 'left'
stream_name = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.quality.all'
data = CC.get_stream(stream_name)
data = data.filter(F.col('quality')==0)
data  = data.withColumn('day',F.date_format('localtime',"YYYYMMdd"))
filtered_data = bandpass_filter(
                   data,
                   Fs = 25,
                   low_cutoff = 0.4,
                   high_cutoff = 4.0,
                   filter_order = 65,
                   ppg_columns=['red','green','infrared'],
                   acl_columns=['aclx','acly','aclz'],
                   wrist=wrist,
                   sensor_name='motionsensehrvmperf')
CC.save_stream(filtered_data)

Filter Minute Length Data by Variance

In [None]:
def get_metadata1(data,
                 wrist,
                 sensor_name='motionsensehrvmperf',
                 ppg_columns=('red','infrared','green'),
                 acl_columns=('aclx','acly','aclz'),
                 gyro_columns=('gyrox','gyroy','gyroz')):
    """
    :param data: input stream
    :param wrist: which wrist the data was collected from
    :param sensor_name: name of sensor
    :param ppg_columns: columns in the input dataframe referring to multiple ppg channels
    :param acl_columns: columns in the input dataframe referring to accelerometer channels

    :return: metadata of output stream
    """
    stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.acl.filtered"
    stream_metadata = Metadata()
    stream_metadata.set_name(stream_name).set_description("Bandpass Filtered PPG data") \
        .add_dataDescriptor(DataDescriptor().set_name("timestamp").set_type("datetime")) \
        .add_dataDescriptor(DataDescriptor().set_name("localtime").set_type("datetime")) \
        .add_dataDescriptor(DataDescriptor().set_name("version").set_type("int")) \
        .add_dataDescriptor(DataDescriptor().set_name("user").set_type("string"))

    for c in ppg_columns:
        stream_metadata.add_dataDescriptor(DataDescriptor().set_name(c).set_type("double").set_attribute("description",
                                                                                                    "ppg channel "+c))
    for c in acl_columns:
        stream_metadata.add_dataDescriptor(DataDescriptor().set_name(c).set_type("double").set_attribute("description",
                                                                                            "accelerometer channel "+c))
    for c in gyro_columns:
        stream_metadata.add_dataDescriptor(DataDescriptor().set_name(c).set_type("double").set_attribute("description",
                                                                                            "gyroscope channel "+c))

    stream_metadata.add_module(
        ModuleMetadata().set_name("ppg").set_attribute("url", "http://md2k.org/").set_author(
            "Md Azim Ullah", "mullah@memphis.edu"))
    return stream_metadata


from copy import deepcopy
def filter_60sec_data(data,
                      wrist,
                      Fs = 25,
                      acceptable = .85,
                      window_duration = 60):
    def get_std(dt):
        return np.sqrt(dt['aclx'].std()**2+dt['acly'].std()**2+dt['aclz'].std()**2)
    
    schema = deepcopy(data.schema)
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def acl_variance_filter(df):
        if df.shape[0]<Fs*acceptable*window_duration:
            return pd.DataFrame([],columns=df.columns)
        df = df.sort_values('timestamp').reset_index(drop=True)
#         df['mag'] = df.apply(lambda a:np.sqrt(a['aclx']**2+a['acly']**2+a['aclz']**2),axis=1)
        acl_stds = np.array([get_std(d) for i,d in df.groupby(pd.Grouper(key='timestamp',freq='5S'))])
#         variance = np.sqrt(np.square(df['aclx'].std())+np.square(df['acly'].std())+np.square(df['aclz'].std()))
#         variance = len(acl_stds[acl_stds>=.21])<=len(acl_stds)*.5 
#         df.drop(columns=['mag'],inplace=True) 
        if np.median(acl_stds)<.21:
            return pd.DataFrame([],columns=df.columns)
        else:
            print('-'*40,'This works')
            return df
    win = F.window("timestamp", windowDuration='60 seconds', startTime='0 seconds')
    ppg_filtered = data._data.groupBy(['user','version']+[win]).apply(acl_variance_filter)
    metadata1 = get_metadata1(data,wrist=wrist)
    ds = DataStream(data=ppg_filtered,metadata=metadata1)
    return ds

In [None]:
import numpy as np
wrist = 'left'
stream_name = 'org.md2k.feature.motionsensehrv.decoded.'+str(wrist)+'wrist.quality.all'
data = CC.get_stream(stream_name)
data = data.filter(F.col('quality')==0).drop('quality')
ds = filter_60sec_data(data,wrist=wrist)
CC.save_stream(ds,overwrite=True)

Checking Data Proportions by participant

In [None]:
wrist = 'left'
sensor_name='motionsensehrvmperf'
stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.acl.filtered"
data = CC.get_stream(stream_name)
data  = data.withColumn('day',F.date_format('localtime',"yyyyMMdd"))

count_data = data._data.groupBy(['user','day']).count().toPandas()
count_data['count'] = count_data['count']/(25*3600)
data_users = count_data.groupby('user',as_index=False).count()
data_users = data_users[data_users['count']>10]
users_wanted = data_users['user'].values
count_data_final = count_data[count_data.user.isin(users_wanted)&count_data['count']>.5]

total_user_data = count_data_final.groupby(['user']).sum().to_dict()['count']

daywise_user_data = count_data_final.groupby(['user','day']).sum().to_dict()['count']

pickle.dump([total_user_data,daywise_user_data],open('../data/data_participants.p','wb'))

Saving Participant Data

In [None]:
total_user_data,daywise_user_data = pickle.load(open('../data/data_participants.p','rb'))
wrist = 'left'
sensor_name='motionsensehrvmperf'
stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.acl.filtered"
data = CC.get_stream(stream_name)
users = list(total_user_data.keys())
data = data.filter(F.col('user').isin(users))
data  = data.withColumn('day',F.date_format('localtime',"yyyyMMdd"))
days = list(list(zip(*list(daywise_user_data.keys())))[1])
data = data.filter(F.col('day').isin(days))

data = data.withColumn('time',F.col('timestamp').cast('double'))

data = data.withColumn('data',F.array('time',
                                      'red','infrared','green',
                                      'aclx','acly','aclz',
                                      'gyrox','gyroy','gyroz')).drop(*['time','red','infrared','green','aclx',
                                                       'acly','aclz','gyrox','gyroy','gyroz'])

win = F.window("timestamp", windowDuration='60 seconds', startTime='0 seconds')
groupbycols = ["user",'version'] + [win]
data_grouped = data._data.groupBy(groupbycols).agg(F.collect_list('data'),
                                                   F.collect_list('timestamp'),
                                                  F.collect_list('localtime')).withColumnRenamed('collect_list(data)',
                                                                                                 'data').withColumnRenamed('collect_list(timestamp)',
                                                                                                                            'timestamp').withColumnRenamed('collect_list(localtime)',
                                                                                                                            'localtime')

data_grouped = data_grouped.withColumn('localtime',F.col('localtime').getItem(0)).withColumn('timestamp',F.col('timestamp').getItem(0))

stream_metadata = Metadata()
stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.acl.filtered.data.grouped.minute"
stream_metadata.set_name(stream_name).set_description("Data Parsed as minute length array") \
    .add_dataDescriptor(
    DataDescriptor().set_name("window").set_type("object").set_attribute("description", \
    "start and end times")) \
    .add_dataDescriptor( \
    DataDescriptor().set_name("data").set_type("array").set_attribute("description", \
    "input data")) \
    .add_dataDescriptor( \
    DataDescriptor().set_name("user").set_type("string").set_attribute("description", \
    "user id")) \
    .add_dataDescriptor( \
    DataDescriptor().set_name("version").set_type("integer").set_attribute("description", \
    "version id")) \
    .add_dataDescriptor( \
    DataDescriptor().set_name("timestamp").set_type("timestamp").set_attribute("description", \
    "timestamp")) \
    .add_dataDescriptor( \
    DataDescriptor().set_name("localtime").set_type("timestamp").set_attribute("description", \
    "localtime")) \
    .add_module( \
    ModuleMetadata().set_name("fourtytwo/mullah/cc3/motionsenseHRVquality.ipynb").set_attribute("url", "https://md2k.org").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
data_grouped = DataStream(data=data_grouped,metadata=stream_metadata)
CC.save_stream(data_grouped,overwrite=True)

In [None]:
stream_name = "org.md2k."+str(sensor_name)+"."+str(wrist)+".wrist.acl.filtered.data.grouped.minute"
data = CC.get_stream(stream_name)

In [None]:
data.count()

In [None]:
323953