In [1]:
import numpy as np
import pandas as pd
import pickle
from scipy import signal
import os
import devicely
import zipfile
import re
import urllib.request
import openpyxl
import dask
from dask.distributed import Client, progress

# 1. Download nurses dataset
Download the dataset and extract it. The website is: https://datadryad.org/stash/dataset/doi:10.5061/dryad.5hqbzkh6f

In [2]:
%%time

# download if not already available
filename = 'doi_10.5061_dryad.5hqbzkh6f__v6.zip'
if not os.path.isfile(filename):
    print('downloading')
    urllib.request.urlretrieve('http://datadryad.org/api/v2/datasets/doi%253A10.5061%252Fdryad.5hqbzkh6f/download',
                               filename)

CPU times: total: 0 ns
Wall time: 0 ns


In [3]:
%%time

# exctract if folder is not present
toFolder = 'doi_10.5061_dryad.5hqbzkh6f__v6'
if not os.path.isdir(toFolder):
    print('extracting')
    with zipfile.ZipFile(filename, 'r') as zfile:
        zfile.extractall(path=toFolder)

CPU times: total: 0 ns
Wall time: 0 ns


# 2. Settings and extracting data

In [4]:
# supress warning that is not relevant for the code here
pd.options.mode.chained_assignment = None

In [5]:
# constant for time that need to be shifted for data
cst = pd.Timedelta(hours=-6)

In [6]:
# function for extracting all data from the nurse dataset
def extract_nested_zip(zippedFile, toFolder):
    """ Extract a zip file including any nested zip files
        Delete the zip file(s) after extraction
    """
    with zipfile.ZipFile(zippedFile, 'r') as zfile:
        zfile.extractall(path=toFolder)
    os.remove(zippedFile)
    for root, dirs, files in os.walk(toFolder):
        for filename in files:
            if re.search(r'\.zip$', filename):
                zip_file = root+'/'+filename
                folder_name = zip_file[:-4]
                extract_nested_zip(zip_file, folder_name)

In [7]:
%%time

# unzip all folders
extract_folder = 'doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/'

if not os.path.isdir(extract_folder):
    os.makedirs(extract_folder)

file_to_extract = 'doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset.zip'

if os.path.isfile(file_to_extract):
    extract_nested_zip(file_to_extract, extract_folder)

CPU times: total: 0 ns
Wall time: 0 ns


# 3. Extract all data from nurses dataset

In [8]:
# read survey excel file with labels
survey = pd.read_excel('doi_10.5061_dryad.5hqbzkh6f__v6/SurveyResults.xlsx',
                parse_dates=[['date', 'Start time'], ['date', 'End time']],
                dtype={'ID':str, 'Stress level':str})
survey = survey[['ID', 'date_Start time', 'date_End time', 'Stress level']]
survey = survey[survey['Stress level'] != 'na']
survey = survey[survey['Stress level'] != '1']
survey['Stress level'] = survey['Stress level'].replace({'2': '1'})
survey = survey.astype({'Stress level': int})
survey = survey.rename(columns={'ID':'subject', 'date_Start time':'dateStart', 'date_End time':'dateEnd', 'Stress level':'label'})

In [9]:
# create list of all folder paths

all_folders = []

participants = [name for name in os.listdir('doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/')
                if os.path.isdir('doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/'+name)]

for p in participants:
    subfolders = [name for name in os.listdir('doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/'+p)
                if os.path.isdir('doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/'+p+'/'+name)]
    for sf in subfolders:
        folder = 'doi_10.5061_dryad.5hqbzkh6f__v6/Stress_dataset/'+p+'/'+sf
        all_folders.append(folder)

In [10]:
# function for getting a labeled dataframe from a subject's folder
# utilized the survey df

@dask.delayed
def get_df_from_folder(folder):
    
    subject = folder[47:49]
    
    empatica_reader = devicely.EmpaticaReader(folder)
    empatica_reader.timeshift(cst)
    df = empatica_reader.data
    
    df = df[['ACC_X', 'ACC_Y', 'ACC_Z', 'BVP', 'EDA', 'HR', 'TEMP']]
    df = df.rename(columns={'ACC_X':'x', 'ACC_Y':'y', 'ACC_Z':'z'})

    # remove rows where we do not have BVP
    #df = df[df['BVP'].notna()]

    # remove rows where there are more than 1 consecutive x missing
    #k = 2
    #i = df.x.isnull()
    #m = ~(df.groupby(i.ne(i.shift()).cumsum().values).x.transform('size').ge(k) & i)
    #df = df[m]
    
    df['label'] = np.nan
    df['subject'] = subject

    # get subject-df with all labels
    subject_labels_df = survey[survey['subject'] == subject]

    subject_df_list = []

    # iterate over each line of the subject-df and set labels in df
    for index, row in subject_labels_df.iterrows():
        start = row['dateStart']
        end = row['dateEnd']
        label = row['label']
        relevant_df = df.loc[start:end]
        relevant_df['label'] = label
        subject_df_list.append(relevant_df)
        
    subject_df = pd.concat(subject_df_list)
    return subject_df

In [12]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 32,Total memory: 127.85 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50445,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 32
Started: Just now,Total memory: 127.85 GiB

0,1
Comm: tcp://127.0.0.1:50481,Total threads: 4
Dashboard: http://127.0.0.1:50484/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50449,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-ulqi3xmf,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-ulqi3xmf

0,1
Comm: tcp://127.0.0.1:50492,Total threads: 4
Dashboard: http://127.0.0.1:50493/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50454,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-dwcri1ib,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-dwcri1ib

0,1
Comm: tcp://127.0.0.1:50501,Total threads: 4
Dashboard: http://127.0.0.1:50502/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50452,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-0fx2dyt1,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-0fx2dyt1

0,1
Comm: tcp://127.0.0.1:50498,Total threads: 4
Dashboard: http://127.0.0.1:50499/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50448,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4_h23899,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4_h23899

0,1
Comm: tcp://127.0.0.1:50486,Total threads: 4
Dashboard: http://127.0.0.1:50488/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50455,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-2zta751j,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-2zta751j

0,1
Comm: tcp://127.0.0.1:50487,Total threads: 4
Dashboard: http://127.0.0.1:50489/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50453,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4daeagnk,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4daeagnk

0,1
Comm: tcp://127.0.0.1:50480,Total threads: 4
Dashboard: http://127.0.0.1:50482/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50450,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4f7iy63y,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-4f7iy63y

0,1
Comm: tcp://127.0.0.1:50495,Total threads: 4
Dashboard: http://127.0.0.1:50496/status,Memory: 15.98 GiB
Nanny: tcp://127.0.0.1:50451,
Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-69ooad6p,Local directory: C:\Users\felix\AppData\Local\Temp\dask-worker-space\worker-69ooad6p


In [35]:
%%time

# loop over all participants and all their folders and create the labled dfs
# concat all resulting dfs together into one

# https://stackoverflow.com/questions/61215533/how-to-parallelize-a-loop-with-dask

df_results_list = []

for folder in all_folders:
    df_curr = get_df_from_folder(folder)
    df_results_list.append(df_curr)
graph = dask.delayed()(df_results_list)
df_results_list_computed = graph.compute()

CPU times: total: 6.36 s
Wall time: 4min 25s


In [37]:
df_results = pd.concat(df_results_list_computed)

In [38]:
# drop index because we don't need it
#df = df_results.reset_index()
#df = df.drop(columns=['index'])
df_results

Unnamed: 0,x,y,z,BVP,EDA,HR,TEMP,label,subject
2020-07-08 08:30:00.000000,-52.0,15.0,-35.0,18.47,5.773421,99.05,31.63,1,15
2020-07-08 08:30:00.015625,,,,10.63,,,,1,15
2020-07-08 08:30:00.031250,-52.0,15.0,-35.0,-0.95,,,,1,15
2020-07-08 08:30:00.046875,,,,-16.12,,,,1,15
2020-07-08 08:30:00.062500,-52.0,15.0,-35.0,-33.68,,,,1,15
...,...,...,...,...,...,...,...,...,...
2020-07-23 12:25:57.250000,,,,,0.129352,,,1,F5
2020-07-23 12:25:57.500000,,,,,0.125510,,,1,F5
2020-07-23 12:25:57.750000,,,,,0.119106,,,1,F5
2020-07-23 12:25:58.000000,,,,,0.116545,,,1,F5


# 4. Store to Parquet file

In [39]:
# store as parquet

if not os.path.isdir('data-input'):
    os.makedirs('data-input')

df_results.to_parquet('data-input/dataset_nurse_acc_bvp_eda_hr_temp.parquet')