## AVRO File to CSV (Code from Empatica)

In [6]:
from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os
import pandas as pd
import numpy as np

In [None]:
os.chdir("C:/Users/katgm/Rutgers University/Michelle Chen - Rutgers_Neuropsych_Lab/COVID_Fatigue/RC_award/Data/Test_Data/")

## Export sensors data to csv files

In [7]:
def create_csv(SubjID):
    # Accelerometer
    acc = data["rawData"]["accelerometer"]
    timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
    	for i in range(len(acc["x"]))]
    # Convert ADC counts in g
    delta_physical = acc["imuParams"]["physicalMax"] - acc["imuParams"]["physicalMin"]
    delta_digital = acc["imuParams"]["digitalMax"] - acc["imuParams"]["digitalMin"]
    x_g = [val * delta_physical / delta_digital for val in acc["x"]]
    y_g = [val * delta_physical / delta_digital for val in acc["y"]]
    z_g = [val * delta_physical / delta_digital for val in acc["z"]]
    with open(os.path.join(output_dir, SubjID + '_accelerometer.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "x", "y", "z"])
        writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, x_g, y_g, z_g)])
    
    # Gyroscope
    gyro = data["rawData"]["gyroscope"]
    timestamp = [round(gyro["timestampStart"] + i * (1e6 / gyro["samplingFrequency"]))
        for i in range(len(gyro["x"]))]
    # Convert ADC counts in dps (degree per second)
    delta_physical = gyro["imuParams"]["physicalMax"] - gyro["imuParams"]["physicalMin"]
    delta_digital = gyro["imuParams"]["digitalMax"] - gyro["imuParams"]["digitalMin"]
    x_dps = [val * delta_physical / delta_digital for val in gyro["x"]]
    y_dps = [val * delta_physical / delta_digital for val in gyro["y"]]
    z_dps = [val * delta_physical / delta_digital for val in gyro["z"]]
    with open(os.path.join(output_dir, SubjID + '_gyroscope.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "x", "y", "z"])
        writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, x_dps, y_dps, z_dps)])
    
    # Eda
    eda = data["rawData"]["eda"]
    timestamp = [round(eda["timestampStart"] + i * (1e6 / eda["samplingFrequency"]))
        for i in range(len(eda["values"]))]
    with open(os.path.join(output_dir, SubjID + '_eda.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "eda"])
        writer.writerows([[ts, eda] for ts, eda in zip(timestamp, eda["values"])])
    
    # Temperature
    tmp = data["rawData"]["temperature"]
    timestamp = [round(tmp["timestampStart"] + i * (1e6 / tmp["samplingFrequency"]))
        for i in range(len(tmp["values"]))]
    with open(os.path.join(output_dir, SubjID + '_temperature.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "temperature"])
        writer.writerows([[ts, tmp] for ts, tmp in zip(timestamp, tmp["values"])])
    
    # Tags
    tags = data["rawData"]["tags"]
    with open(os.path.join(output_dir, SubjID + '_tags.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["tags_timestamp"])
        writer.writerows([[tag] for tag in tags["tagsTimeMicros"]])
    
    # BVP
    bvp = data["rawData"]["bvp"]
    timestamp = [round(bvp["timestampStart"] + i * (1e6 / bvp["samplingFrequency"]))
        for i in range(len(bvp["values"]))]
    with open(os.path.join(output_dir, SubjID + '_bvp.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "bvp"])
        writer.writerows([[ts, bvp] for ts, bvp in zip(timestamp, bvp["values"])])
    
    # Systolic peaks
    sps = data["rawData"]["systolicPeaks"]
    with open(os.path.join(output_dir, SubjID + '_systolic_peaks.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["systolic_peak_timestamp"])
        writer.writerows([[sp] for sp in sps["peaksTimeNanos"]])
    
    # Steps
    steps = data["rawData"]["steps"]
    timestamp = [round(steps["timestampStart"] + i * (1e6 / steps["samplingFrequency"]))
        for i in range(len(steps["values"]))]
    with open(os.path.join(output_dir, SubjID + '_steps.csv'), 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "steps"])
        writer.writerows([[ts, step] for ts, step in zip(timestamp, steps["values"])])


In [40]:
# Define the location of the Avro file and output folder.
dir = "C:\\Users\\katgm\\Rutgers University\\Michelle Chen - Rutgers_Neuropsych_Lab\\COVID_Fatigue\\RC_award\\Data\\Test_Data\\"
input_dir = dir + "YingfeiAvro\\"
output_dir = dir + "YingfeiEmbracePlus\\"

counter = 0
for filename in os.listdir(input_dir):
    counter= counter+1
    avro_file_path = os.path.join(input_dir, filename)
    # checking if it is a file
    reader = DataFileReader(open(avro_file_path, "rb"), DatumReader())
    schema = json.loads(reader.meta.get('avro.schema').decode('utf-8'))
    data= next(reader)
    create_csv("Yingfei" + str(counter))

In [38]:
#combining csv files into one

def combine_csv(Subj, number):
    bvp = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_bvp.csv")
        bvp = pd.concat([bvp, temp], ignore_index=True)
    bvp["unix_timestamp"] = bvp["unix_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    bvp["unix_timestamp"] = bvp["unix_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    accelerometer = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_accelerometer.csv")
        accelerometer = pd.concat([accelerometer, temp], ignore_index=True)
    accelerometer["unix_timestamp"] = accelerometer["unix_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    accelerometer["unix_timestamp"] = accelerometer["unix_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    eda = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_eda.csv")
        eda = pd.concat([eda, temp], ignore_index=True)
    eda["unix_timestamp"] = eda["unix_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    eda["unix_timestamp"] = eda["unix_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    systolic_peaks = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_systolic_peaks.csv")
        systolic_peaks = pd.concat([systolic_peaks, temp], ignore_index=True)
    systolic_peaks["systolic_peak_timestamp"] = systolic_peaks["systolic_peak_timestamp"].map(lambda x: pd.to_datetime(x,unit="ns"))
    systolic_peaks["systolic_peak_timestamp"] = systolic_peaks["systolic_peak_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    tags = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_tags.csv")
        tags = pd.concat([tags, temp], ignore_index=True)
    tags["tags_timestamp"] = tags["tags_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    tags["tags_timestamp"] = tags["tags_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    temperature = pd.DataFrame()
    for ii in range(number):
        temp = pd.read_csv(output_dir + Subj + str(ii+1) + "_temperature.csv")
        temperature = pd.concat([temperature, temp], ignore_index=True)
    temperature["unix_timestamp"] = temperature["unix_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    temperature["unix_timestamp"] = temperature["unix_timestamp"].map(lambda x: x.round(freq="15.625ms"))
    
    #gyroscope is empty
    # KatTest_gyroscope = pd.DataFrame()
    # for ii in range(6):
    #     temp = pd.read_csv(output_dir + "KatTest" + str(ii+1) + "_gyroscope.csv")
    #     KatTest_gyroscope = pd.concat([KatTest_gyroscope, temp], ignore_index=True)
    # KatTest_accelerometer["unix_timestamp"] = KatTest_accelerometer["unix_timestamp"].map(lambda x: pd.to_datetime(x,unit="us"))
    
    # steps not really relevant for comparison data
    # KatTest_steps = pd.DataFrame()
    # for ii in range(6):
    #     temp = pd.read_csv(output_dir + "KatTest" + str(ii+1) + "_steps.csv")
    #     KatTest_steps = pd.concat([KatTest_steps, temp], ignore_index=True)

    merged = pd.merge(bvp, accelerometer, on = "unix_timestamp", how="left").merge(eda, on="unix_timestamp",how="left").merge(temperature, on="unix_timestamp",how="left")
    merged = merged.rename(columns={"unix_timestamp": "timestamp", "x": "acc_x", "y":"acc_y", "z":"acc_z"})

    os.chdir(output_dir)
    
    merged.to_csv(Subj + '.csv', index=False)
    tags.to_csv(Subj + "_tags.csv", index=False)
    systolic_peaks.to_csv(Subj + "_peaks.csv", index=False)

In [41]:
combine_csv("Yingfei",2) 

In [77]:
#the timestamps don't match up between bvp, accelerometer, eda, etc. Check to see what the sampling frequency actually is.
#are the observations actually evenly spaced apart?

bvp_diff = KatTest_bvp["unix_timestamp"].diff() 
#they are evenly spaced. 0.015625, which is 1/64 of a second. As expected. 64Hz
acc_diff = KatTest_accelerometer["unix_timestamp"].diff()
#also evenly spaced at 1/64 of a second, sometimes one or two seconds off. 64Hz
#they just have different starting microseconds.
eda_diff = KatTest_eda["unix_timestamp"].diff()
#its about 0.25, which is 1/4 of a second as expected. 4Hz
systolic_diff = KatTest_systolic_peaks["systolic_peak_timestamp"].diff()
#all different since it's peaks
tags_diff = KatTest_tags["tags_timestamp"].diff() #about 5 min apart since that's when i tagged
temp_diff = KatTest_temperature["unix_timestamp"].diff()
#very close to 1 second apart. 1 Hz same as e4

# Organizing data to mirror the E4 dataframes

In [54]:
KTest = pd.read_csv(dir + "KatEmbracePlus2/Kat.csv")
KTags = pd.read_csv(dir + "KatEmbracePlus2/Kat_tags.csv")
KTest['timestamp'] = pd.to_datetime(KTest['timestamp'])
KTags['tags_timestamp'] = pd.to_datetime(KTags['tags_timestamp'])

DTest = pd.read_csv(dir + "DarsheeEmbracePlus2/Darshee.csv")
DTags = pd.read_csv(dir + "DarsheeEmbracePlus2/Darshee_tags.csv")
DTags['tags_timestamp'] = pd.to_datetime(DTags['tags_timestamp'])
DTest['timestamp'] = pd.to_datetime(DTest['timestamp'])

YTest = pd.read_csv(dir + "YingfeiEmbracePlus/Yingfei.csv")
YTags = pd.read_csv(dir + "YingfeiEmbracePlus/Yingfei_tags.csv")
YTags['tags_timestamp'] = pd.to_datetime(YTags['tags_timestamp'])
YTest['timestamp'] = pd.to_datetime(YTest['timestamp'])

In [64]:
#generating tags
#start at tag 2 and increment in 5 minute intervals
#darshee's tags
DInit = DTags.iloc[2,0]
DTags2 = pd.DataFrame({"tags_timestamp":[DInit,DInit+pd.Timedelta(minutes=5),DInit+pd.Timedelta(minutes=10),DInit+pd.Timedelta(minutes=15),
                                          DInit+pd.Timedelta(minutes=20),DInit+pd.Timedelta(minutes=25),DInit+pd.Timedelta(minutes=30),
                                          DInit+pd.Timedelta(minutes=35)]})
#generating yingfei's
YInit = YTags.iloc[0,0]
YTags2 = pd.DataFrame({"tags_timestamp":[YInit,YInit+pd.Timedelta(minutes=5),YInit+pd.Timedelta(minutes=10),YInit+pd.Timedelta(minutes=15),
                                          YInit+pd.Timedelta(minutes=20),YInit+pd.Timedelta(minutes=25),YInit+pd.Timedelta(minutes=30),
                                          YInit+pd.Timedelta(minutes=35)]})

#generating kat's
KInit = KTags.iloc[1,0]
KTags2 = pd.DataFrame({"tags_timestamp":[KInit,KInit+pd.Timedelta(minutes=5),KInit+pd.Timedelta(minutes=10),KInit+pd.Timedelta(minutes=15),
                                          KInit+pd.Timedelta(minutes=20),KInit+pd.Timedelta(minutes=25),KInit+pd.Timedelta(minutes=30),
                                          KInit+pd.Timedelta(minutes=35)]})

In [74]:
def add_blocks(tags, data):    
    if len(tags) != 8:
        print("Incorrect number of tags. There are " + str(len(tags)) + " when there should be 8.")
        return None

     
    # remove data before and after tags range
    remove = data[(data['timestamp'] > tags.iloc[7,0])].index
    remove2 = data[(data['timestamp'] < tags.iloc[0,0])].index
    temp = data.drop(remove)
    df = temp.drop(remove2)
    # Add block numbers to dataframe
    conditions = [
        (df['timestamp'] >= tags.iloc[0,0]) & (df['timestamp'] < tags.iloc[1,0]),
        (df['timestamp'] >= tags.iloc[1,0]) & (df['timestamp'] < tags.iloc[2,0]),
        (df['timestamp'] >= tags.iloc[2,0]) & (df['timestamp'] < tags.iloc[3,0]),
        (df['timestamp'] >= tags.iloc[3,0]) & (df['timestamp'] < tags.iloc[4,0]),
        (df['timestamp'] >= tags.iloc[4,0]) & (df['timestamp'] < tags.iloc[5,0]),
        (df['timestamp'] >= tags.iloc[5,0]) & (df['timestamp'] < tags.iloc[6,0]),
        (df['timestamp'] >= tags.iloc[6,0]) & (df['timestamp'] < tags.iloc[7,0]),
    ]

    blocks = [0,1,2,3,4,5,6]
    #fakeRating = [43,55,56,55,60,69,70]
    
    df['Block'] = np.select(conditions, blocks, default=pd.NA)
    #df['Fatigue_Rating'] = np.select(conditions, fakeRating, default=pd.NA)
    
    return df

In [75]:
KatFinal2 = add_blocks(KTags2,KTest)
YingfeiFinal2 = add_blocks(YTags2,YTest)
DarsheeFinal2 = add_blocks(DTags2,DTest)

In [77]:
KatFinal2.to_csv("KatEmP2.csv",index=False)
YingfeiFinal2.to_csv("YingfeiEmP2.csv",index=False)
DarsheeFinal2.to_csv("DarsheeEmP2.csv",index=False)

# Reading E4 Files in (predominantly using code from lab_data_concat)

In [87]:
os.chdir(dir)

In [88]:
from datetime import datetime, timedelta

In [89]:
def concatenate_lab_data(SubjID):
    wd = os.getcwd() + '\\' + SubjID + 'E42\\'
    eda = pd.read_csv(wd + 'EDA.csv', header=None).to_numpy().flatten()
    bvp = pd.read_csv(wd + 'BVP.csv', header=None).to_numpy().flatten()
    acc = pd.read_csv(wd + 'ACC.csv', header=None).to_numpy()
    temp = pd.read_csv(wd + 'TEMP.csv', header=None).to_numpy().flatten()
    
    init_time = pd.to_datetime(eda[0],unit="s") #they all have the same initial time
    
    eda = eda[2:]
    bvp = bvp[2:]
    acc = acc[2:]
    temp = temp[2:]
    
    eda_interval = timedelta(seconds=1/4)
    bvp_interval = timedelta(seconds=1/64)
    acc_interval = timedelta(seconds=1/32)
    temp_interval = timedelta(seconds=1/4)
    
    eda_timestamps = [init_time + i * eda_interval for i in range(len(eda))]
    bvp_timestamps = [init_time + i * bvp_interval for i in range(len(bvp))]
    acc_timestamps = [init_time + i * acc_interval for i in range(len(acc))]
    temp_timestamps = [init_time + i * temp_interval for i in range(len(temp))]
    
    # Create a new DataFrame with the timestamps and the original data columns
    eda_df = pd.DataFrame(data = eda, columns=['eda'])
    eda_df['timestamp'] = eda_timestamps

    bvp_df = pd.DataFrame(data = bvp, columns=['bvp'])
    bvp_df['timestamp'] = bvp_timestamps

    acc_df = pd.DataFrame(data = acc, columns=['acc_x', 'acc_y', 'acc_z'])
    acc_df['timestamp'] = acc_timestamps

    temp_df = pd.DataFrame(data = temp, columns=['temperature'])
    temp_df['timestamp'] = temp_timestamps
 
    #merge dataframes together, change column order, LEAVING NA values, because we should filter the signal first!
    merged_df = pd.merge(bvp_df, eda_df, on='timestamp', how='left').merge(acc_df, on='timestamp', how='left').merge(temp_df, on='timestamp', how='left')
    
    new_cols = ['timestamp','bvp', 'acc_x','acc_y','acc_z','eda','temperature']
    
    merged_df = merged_df[new_cols]
    return merged_df

In [90]:
YingfeiE42 = concatenate_lab_data("Yingfei")
KatE42 = concatenate_lab_data("Kat")
DarE42 = concatenate_lab_data("Darshee")

In [92]:
DarE42Final = add_blocks(DTags2, DarE42)
YingfeiE42Final = add_blocks(YTags2, YingfeiE42)
KatE42Final = add_blocks(KTags2, KatE42)

In [96]:
os.chdir(dir + "FinalData")
KatE42Final.to_csv("KatE42.csv",index=False)
YingfeiE42Final.to_csv("YingfeiE42.csv",index=False)
DarE42Final.to_csv("DarE42.csv",index=False)