In [0]:
storage_account_name = "****"
storage_account_key = "****"
container = "****"

In [0]:

import numpy as np
import pandas as pd
import datetime as datetime
from scipy.signal import find_peaks, peak_prominences
from scipy.interpolate import interp1d
from scipy import signal
from scipy.integrate import trapz
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.window import Window
import  pyspark.sql.functions as F
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType,StructField
from pyspark.sql.types import *



In [0]:
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [0]:
from pyspark.sql.functions import col, window,expr,mean,stddev,min,max,skewness,median,lag,abs,sum,count

def dataimport(filepath, sensortype, Starttime = 'NaN',Endtime = 'NaN',windows='5min'): 

    if sensortype == 'Glucose':
        data = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"+filepath).toDF('Index','Time','Event Type','Event Subtype',	'Patient Info','Device Info','Source Device ID','Var','Insulin Value (u)','Carb Value (grams)','Duration (hh:mm:ss)','Glucose Rate of Change (mg/dL/min)',	'Transmitter Time (Long Integer)')
        columns_to_drop = ['Index','Event Type','Event Subtype',	'Patient Info','Device Info','Source Device ID','Insulin Value (u)','Carb Value (grams)','Duration (hh:mm:ss)','Glucose Rate of Change (mg/dL/min)',	'Transmitter Time (Long Integer)']
        data = data.select([col for col in data.columns if col not in columns_to_drop])

    elif sensortype == 'Food':
        data = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"+filepath).toDF('date','times','Time','time_end','logged_food','amount',	'unit',	'searched_food','calorie','total_carb','dietary_fiber','sugar','protein','total_fat')
        columns_to_drop = ['date','times','time_end','logged_food','amount',	'unit','searched_food','dietary_fiber','total_fat']
        data = data.select([col for col in data.columns if col not in columns_to_drop])

    elif sensortype == 'ACC':
        data = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"+filepath).toDF('Time','x','y','z')
        data = data.withColumn("Var", (data["x"]**2 + data["y"]**2 + data["z"]**2)**(-2))
        columns_to_drop = ['x', 'y', 'z']
        data = data.select([col for col in data.columns if col not in columns_to_drop])
 
    else:
        data = spark.read.option("header", "true").option("inferSchema", "true").option("delimiter", ",").csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/"+filepath).toDF('Time','Var')
        
    data = data.withColumn("Time", to_timestamp(data["Time"], "yyyy-MM-dd HH:mm:ss.SSS"))
    
    if Starttime != 'NaN':
        VarData = data.filter(data["Time"] >= Starttime)
        if Endttime != 'NaN':
            VarData = VarData.filter((VarData["Time"] <= Endtime))
    else:
        VarData = data

    return VarData


def ACC_feature(ACC_data,sensortype = 'ACC'):

    Data = pd.DataFrame()

    Data_1= ACC_data.withColumn("Sensor_Mean", ACC_data["Var"].cast("double")) \
    .groupBy(window("Time", "5 minutes").alias('Time')) \
        .agg(
        expr("percentile_approx(Sensor_Mean, 0.25)").alias(sensortype+'_Q1G'),
        expr("percentile_approx(Sensor_Mean, 0.75)").alias(sensortype+'_Q3G'),
        mean("Sensor_Mean").alias(sensortype+"mean"),
        stddev("Sensor_Mean").alias(sensortype+"dev"),
        min("Sensor_Mean").alias(sensortype+"min"),
        max("Sensor_Mean").alias(sensortype+"max"),
        skewness("Sensor_Mean").alias(sensortype+"_skewness")
    )
        
    ACC_data = ACC_data.withColumn('Time', F.to_timestamp('Time'))
    window_ = Window.orderBy("Time")
    ACC_data = ACC_data.withColumn("lag", F.lag("Time").over(window_))
    ACC_data = ACC_data.withColumn("time_diff", F.col("Time").cast("long") - F.col("lag").cast("long"))

    window_spec = Window.orderBy('time_diff').rangeBetween(-7200, 0)
    ACC_data = ACC_data.withColumn('ACC_mean_2hr', mean('Var').over(window_spec))
    ACC_data = ACC_data.withColumn('ACC_max_2hr', max('Var').over(window_spec))
    
    Data_2 = ACC_data.select(['Time','ACC_mean_2hr','ACC_max_2hr'])

    Data_2 = Data_2.withColumnRenamed('Time','Time_')
    
    Data = Data_1.join(Data_2,Data_1['Time']['start'] == Data_2['Time_'], how = 'inner')

    #Data = Data.select([''])

    print((sensortype + ' Import and Resample Complete'))
    
    return(Data)


In [0]:
# filepath = "ACC_001.csv"
# sensortype = 'ACC'
# ACC_data = dataimport(filepath, sensortype)
# ACC = ACC_feature(ACC_data)
# ACC.limit(1).display()

ACC Import and Resample Complete


Time,ACC_Q1G,ACC_Q3G,ACCmean,ACCdev,ACCmin,ACCmax,ACC_skewness,Time_,ACC_mean_2hr,ACC_max_2hr
"List(2020-02-13T15:30:00.000+0000, 2020-02-13T15:35:00.000+0000)",6.120788621992583e-08,6.608468091011823e-08,6.580386701626236e-08,3.884151720395423e-08,2.543304586505712e-09,1.6394490795313143e-06,21.894884265345883,2020-02-13T15:30:00.000+0000,1.7233871955669065e-07,1.0


In [0]:
from pyspark.sql.functions import col, avg, lit

def exercisepts_spark(df):
    # Define a window specification to calculate running averages
    windowSpec = Window.orderBy("Time").rowsBetween(Window.unboundedPreceding, -1)

    # Calculate running averages for acc and hr
    df = df.withColumn("avg_acc", avg("ACC").over(windowSpec))
    df = df.withColumn("avg_hr", avg("HR").over(windowSpec))

    # Define a condition for activity bouts
    activity_condition = (col("ACC") > col("avg_acc")) & (col("HR") > col("avg_hr"))

    # Apply the condition and create a new column for activity bouts
    df = df.withColumn("Activity Bouts", F.when(activity_condition, lit(1)).otherwise(lit(0)))

    # Count the number of activity bouts
    countbouts = df.filter(col("Activity Bouts") == 1).count()

    # Select relevant columns to return
    returndf = df.select("Time", "Activity Bouts")

    return returndf


In [0]:

def HRV_feature(IBI_data, ibimultiplier = 1000,sensortype='HRV'):
    """
        computes Heart Rate Variability metrics
        Args:
            time (pandas.DataFrame column or pandas series): time column
            IBI (pandas.DataFrame column or pandas series): column with inter beat intervals
            ibimultiplier (IntegerType): defualt = 1000; transforms IBI to milliseconds. If data is already in ms, set as 1
        Returns:
            maxHRV (FloatType): maximum HRV
            minHRV (FloatType): minimum HRV
            meanHRV (FloatType): mean HRV
            medianHRV(FloatType): median HRV
    """

    Data = pd.DataFrame()

    #IBI_data['Var'] = IBI_data['Var']*ibimultiplier

    Data = IBI_data.withColumn("Sensor_Mean", IBI_data["Var"].cast("double")) \
    .groupBy(window("Time", "5 minutes").alias('Time')) \
        .agg(
        (mean("Sensor_Mean")*ibimultiplier).alias(sensortype+"mean"),
        (median("Sensor_Mean")*ibimultiplier).alias(sensortype+"dev"),
        (min("Sensor_Mean")*ibimultiplier).alias(sensortype+"min"),
        (max("Sensor_Mean")*ibimultiplier).alias(sensortype+"max")
    )
    
    return Data

In [0]:
def TEMP_feature(TEMP_data,sensortype = 'TEMP'):

    Data = pd.DataFrame()

    Data = TEMP_data.withColumn("Sensor_Mean", col("Var").cast("double")) \
    .groupBy(window("Time", "5 minutes").alias('Time')) \
        .agg(
        expr("percentile_approx(Sensor_Mean, 0.25)").alias(sensortype+'_Q1G'),
        expr("percentile_approx(Sensor_Mean, 0.75)").alias(sensortype+'_Q3G'),
        mean("Sensor_Mean").alias(sensortype+"mean"),
        stddev("Sensor_Mean").alias(sensortype+"dev"),
        min("Sensor_Mean").alias(sensortype+"min"),
        max("Sensor_Mean").alias(sensortype+"max"),
        skewness("Sensor_Mean").alias(sensortype+"_skewness")
    )
    
    
    return(Data)

In [0]:
def HR_feature(HR_data,sensortype='HR'):

    Data = pd.DataFrame()

    HR_data = HR_data.withColumn('Time', F.to_timestamp(col('Time'), 'yyyy/M/d h:mm:ss a'))

    Data = HR_data.withColumn("Sensor_Mean", col("Var").cast("double")) \
    .groupBy(window("Time", "5 minutes").alias('Time')) \
        .agg(
        expr("percentile_approx(Sensor_Mean, 0.25)").alias(sensortype+'_Q1G'),
        expr("percentile_approx(Sensor_Mean, 0.75)").alias(sensortype+'_Q3G'),
        mean("Sensor_Mean").alias(sensortype+"mean"),
        stddev("Sensor_Mean").alias(sensortype+"dev"),
        min("Sensor_Mean").alias(sensortype+"min"),
        max("Sensor_Mean").alias(sensortype+"max"),
        skewness("Sensor_Mean").alias(sensortype+"_skewness")
    )
    
    
    return(Data)


In [0]:
# filepath = 'HR_001.csv'
# sensortype = 'HR'
# HR_data = dataimport(filepath, sensortype)
# HR = HR_feature(HR_data)
# HR.limit(1).display()

In [0]:
def EDA_feature(EDA_data,sensortype='EDA'):

    Data = pd.DataFrame()

    Data = EDA_data.withColumn("Sensor_Mean", col("Var").cast("double")) \
    .groupBy(window("Time", "5 minutes").alias('Time')) \
        .agg(
        expr("percentile_approx(Sensor_Mean, 0.25)").alias(sensortype+'_Q1G'),
        expr("percentile_approx(Sensor_Mean, 0.75)").alias(sensortype+'_Q3G'),
        mean("Sensor_Mean").alias(sensortype+"mean"),
        stddev("Sensor_Mean").alias(sensortype+"dev"),
        min("Sensor_Mean").alias(sensortype+"min"),
        max("Sensor_Mean").alias(sensortype+"max"),
        skewness("Sensor_Mean").alias(sensortype+"_skewness")
    )
    
    return(Data)


In [0]:
# filepath = 'EDA_001.csv'
# sensortype = 'EDA'
# EDA_data = dataimport(filepath, sensortype)
# EDA = EDA_feature(EDA_data)

In [0]:
def NNx_feature(IBI_data, ibimultiplier=1000, x=50):
    """
        computes Heart Rate Variability metrics NNx and pNNx
        Args:
            time (pandas.DataFrame column or pandas series): time column
            IBI (pandas.DataFrame column or pandas series): column with inter beat intervals
            ibimultiplier (IntegerType): defualt = 1000; transforms IBI to milliseconds. If data is already in ms, set as 1
            x (IntegerType): default = 50; set the number of times successive heartbeat intervals exceed 'x' ms
        Returns:
            NNx (FloatType): the number of times successive heartbeat intervals exceed x ms
            pNNx (FloatType): the proportion of NNx divided by the total number of NN (R-R) intervals. 
    """
    Data = pd.DataFrame()

    window_spec = Window().orderBy("Time")
    IBI_data = IBI_data.withColumn("diff_IBI_mean", abs(col("Var") - lag("Var").over(window_spec)))

    Data = IBI_data.withColumn("Sensor_Mean", col("Var").cast("double")) \
     .groupBy(window("Time", "5 minutes").alias('Time')) \
         .agg(
         (mean("Sensor_Mean")*1000).alias("IBI_mean"),
         sum((col("diff_IBI_mean") > 0.05).cast("int")).alias('NN50'),
         (sum((col("diff_IBI_mean") > 0.05).cast("int"))/count(col('diff_IBI_mean')) * 100).alias('pNN50')

     )
    
    
    return Data

In [0]:

def Food_feature(Food):
    Data = pd.DataFrame()

    Food = Food.groupby('Time').agg(sum("calorie").alias("calorie"),
        sum("protein").alias("protein"),
        sum("sugar").alias("sugar"),
        sum("total_carb").alias("total_carb"))

    Food = Food.withColumn('Time', F.to_timestamp('Time'))
    window = Window.orderBy("Time")
    Food = Food.withColumn("lag", F.lag("Time").over(window))
    Food = Food.withColumn("time_diff", F.col("Time").cast("long") - F.col("lag").cast("long"))

    for window in ['2H','8H','24H']:
        window_spec = Window.orderBy('time_diff').rangeBetween(-3600*int(window[0]), 0)
        # Calculate rolling sum for each column
        Food = Food.withColumn(f'Calories_{window}', sum('calorie').over(window_spec))
        Food = Food.withColumn(f'Protein_{window}', sum('protein').over(window_spec))
        Food = Food.withColumn(f'Sugar_{window}', sum('sugar').over(window_spec))
        Food = Food.withColumn(f'Carbs_{window}', sum('total_carb').over(window_spec))
    Food = Food.select(['Time','Calories_2H','Protein_2H','Sugar_2H','Carbs_2H','Calories_8H','Protein_8H','Sugar_8H','Carbs_8H','Calories_24H','Protein_24H','Sugar_24H','Carbs_24H'])
    
    return Food


In [0]:
def PeaksEDA(eda):

    eda_pd = eda.toPandas()
    
    EDAy = eda_pd['Var'].to_numpy()
    EDAx = eda_pd['Time'].to_numpy()
    peaks, _ = find_peaks(EDAy, height=0, distance=4, prominence=0.3)
    
    peaks_x = [eda_pd['Time'][i] for i in peaks]
    
    peakdf_pd = pd.DataFrame({"Time": peaks_x, "Peak": [1] * len(peaks_x)})

    schema = StructType([
        StructField("Time", TimestampType(), True),
        StructField("Peak", DoubleType(), True),
    ])
    peakdf = spark.createDataFrame(peakdf_pd, schema=schema)

    return peakdf

In [0]:
def Calculate_PeakEda(data):

    data = data.withColumn('Time', F.to_timestamp('Time'))
    window = Window.orderBy("Time")
    data = data.withColumn("lag", F.lag("Time").over(window))
    data = data.withColumn("time_diff", F.col("Time").cast("long") - F.col("lag").cast("long"))

    windowSpec = Window.orderBy("time_diff").rangeBetween(-3600 * 2, 0)

    data = data.withColumn("PeakEDA2hr_sum", sum("Peak").over(windowSpec))
    data = data.withColumn("PeakEDA2hr_mean", F.avg("Peak").over(windowSpec))

    return data


In [0]:
def Glucose_feature(Dexcom):

    Data = pd.DataFrame()

    Data = Dexcom.withColumn("Sensor_Mean", col("Var").cast("double")) \
     .groupBy(window("Time", "5 minutes").alias('Time')) \
         .agg(
         (mean("Sensor_Mean")).alias("Glucose_mean")

     )
         
    # Convert the timestamp to time and extract hour and minute
    Data = Data.withColumn('Timefrommidnight', F.date_format(F.col('Time')['start'], 'HH:mm:ss'))
    Data = Data.withColumn('minfrommid', (F.hour(col('Time')['start']) * 60 + F.minute(col('Time')['start']) + F.second(col('Time')['start']) / 60).cast('int'))
    Data = Data.withColumn('hourfrommid', (F.col('minfrommid') / 60).cast('int'))
    
    Data = Data.select(['Time','Glucose_mean','minfrommid','hourfrommid'])
    return Data

In [0]:
def merge_patient(label,id):

    postfix = '00'+str(id) if id < 10 else '0'+str(id)

    ## ACC
    filepath = "ACC_"+postfix+".csv"
    sensortype = 'ACC'
    ACC_data = dataimport(filepath, sensortype)
    ACC = ACC_feature(ACC_data)

    ## IBI
    filepath = 'IBI_'+postfix+".csv"
    sensortype = 'IBI'
    IBI_data = dataimport(filepath, sensortype)
    HRV = HRV_feature(IBI_data)

    ## TEMP
    filepath = 'TEMP_'+postfix+".csv"
    sensortype = 'TEMP'
    TEMP_data = dataimport(filepath, sensortype)
    TEMP = TEMP_feature(TEMP_data)

    ## HR
    filepath = 'HR_'+postfix+".csv"
    sensortype = 'HR'
    HR_data = dataimport(filepath, sensortype)
    HR = HR_feature(HR_data)

    ## EDA
    filepath = 'EDA_'+postfix+".csv"
    sensortype = 'EDA'
    EDA_data = dataimport(filepath, sensortype)
    EDA = EDA_feature(EDA_data)
    
    ## NN
    NNx = NNx_feature(IBI_data)

    ## Food
    filepath = 'Food_Log_'+postfix+".csv"
    sensortype = 'Food'
    Food_data = dataimport(filepath, sensortype)
    Food = Food_feature(Food_data)

    ## PeakEDA
    Peak_EDA = Calculate_PeakEda(PeaksEDA(EDA_data))
    Peak_EDA = Peak_EDA.select(['Time','Peak','PeakEDA2hr_sum','PeakEDA2hr_mean'])

    # ## Activity
    # ACC_data = ACC_data.withColumnRenamed('Var','ACC')
    # HR_data = HR_data.withColumnRenamed('Var','HR')
    # data_test = ACC_data.join(HR_data,on = 'Time',how = 'inner')
    # Activity = exercisepts_spark(data_test)

    ## Glucose
    filepath = 'Dexcom_'+postfix+".csv"
    sensortype = 'Glucose'
    glucose_data = dataimport(filepath, sensortype)
    Glucose = Glucose_feature(glucose_data)

    ## Join the features

    result_df_label = ACC.join(HR, on="Time", how="full_outer")\
    .join(NNx,on="Time", how="full_outer")\
        .join(EDA,on="Time", how="full_outer")\
            .join(HRV,on="Time", how="full_outer")\
                .join(Glucose,on == 'Time', how="inner")\
                    .join(TEMP,on="Time", how="full_outer")
    result_df_label = result_df_label.withColumn('Time', col('Time')['start'])
    result_df_label = result_df_label.withColumn('PatientID', lit(id))
    final_data_label = result_df_label.join(Food,on = 'Time',how = 'left').join(Peak_EDA,on = 'Time',how = 'left')


    result_df_unlabel = ACC.join(HR, on="Time", how="full_outer")\
    .join(NNx,on="Time", how="full_outer")\
        .join(EDA,on="Time", how="full_outer")\
            .join(HRV,on="Time", how="full_outer")\
                    .join(TEMP,on="Time", how="full_outer")
    result_df_label = result_df_label.withColumn('Time', col('Time')['start'])
    result_df_label = result_df_label.withColumn('PatientID', lit(id))
    final_data_label = result_df_label.join(Food,on = 'Time',how = 'left').join(Peak_EDA,on = 'Time',how = 'left')
    
    ## Add a column - patientid
    if label == 1:
        return result_data_label
    else:
        return result_data_unlabel
    


In [None]:
## Write data into different files as per the patient id
patient_id = list(range(2,17))

from pyspark.sql.functions import when
demo_df = spark.read.option('header','true').csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/Demographics.csv")
demo_df = demo_df.withColumn("Gender",when(demo_df['Gender']=='Male',1).otherwise(0))
demo_df = demo_df.withColumn("HbA1c",demo_df['HbA1c'])
demo_df = demo_df.withColumn("ID",demo_df['ID'].cast("string")) 

final_data = merge_patient(1,1)
final_data.limit(1).display()
for id in patient_id:
    final_data_ = merge_patient(1,id)
    final_data = final_data.union(final_data_)
    final_data.limit(1).display()

final_data_ = final_data.join(demo_df,data.PatientID == demo_df.ID,how = 'left')

final_data.coalesce(1).write.mode("overwrite").option("header", "true").option("delimiter", ",").csv(f"wasbs://{container}@{storage_account_name}.#blob.core.windows.net/feature_eng/final_merged.csv")

