## unzip and parse fit file

In [1]:
import zipfile  #zipfile.is_zipfile(filename)
import os
import fitparse
import csv
import pytz
import re

import pandas as pd
import numpy as np
import datetime

In [16]:
def parse_file(path):
    files = os.listdir(path)
    fit_files = [file for file in files if file[-4:].lower()=='.fit']
    for file in fit_files:
        new_filename = file[:-4] + '.csv'
        '''
        if os.path.exists(new_filename):
            continue
        '''
        fitfile = fitparse.FitFile(os.path.join(path,file),  
            data_processor=fitparse.StandardUnitsDataProcessor())
        if re.findall(r"WELLNESS\Z", file[:-4]):
            write_fitfile_to_csv(fitfile, os.path.join(path,new_filename))

In [38]:
# allowed_fields = ['serial_number','time_created','manufacturer',
#                   'garmin_product','number','type','timestamp[s]',
#                   'serial_number','manufacturer','garmin_product',
#                   'software_version','version','timestamp[s]',
#                   'local_timestamp[s]','cycles_to_distance[m/cycle]',
#                   'cycles_to_calories[kcal/cycle]','unknown',
#                   'resting_metabolic_rate[kcal / day]','activity_type',
#                   'timestamp[s]','unknown','timestamp[s]','enabled',
#                   'unknown','stress_level_time','stress_level_value',
#                   'unknown'
#                  ]

#final fields recorded
allowed_fields = ['timestamp_16','heart_rate','stress_level_time','stress_level_value']

#required_fields = ['serial_number', 'time_created']

UTC = pytz.UTC
CST = pytz.timezone('US/Pacific')
def write_fitfile_to_csv(fitfile, output_file='test_output.csv'):
    messages = fitfile.messages
    data = []
    for m in messages:
        skip=False
        if not hasattr(m, 'fields'):
            continue
        fields = m.fields
        #check for important data types
        mdata = {}
        for field in fields:
            if field.name in allowed_fields:
                if field.name=='stress_level_time':
                    mdata[field.name] = UTC.localize(field.value).astimezone(CST)
                else:
                    mdata[field.name] = field.value
        #for rf in required_fields:
        #    if rf not in mdata:
        #        skip=True
        if not skip:
            data.append(mdata)
    #write to csv
    with open(output_file, 'w') as f:
        writer = csv.writer(f)
        writer.writerow(allowed_fields)
        for entry in data:
            writer.writerow([ str(entry.get(k, '')) for k in allowed_fields])
    #print('wrote %s' % output_file)

In [53]:
def read_file_studio(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            sub_sub_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(sub_sub_file_path):
                continue
            sub_sub_files = os.listdir(sub_sub_file_path)
            for sub_sub_file in sub_sub_files:
                
                bottom_file_path = os.path.join(path,file,sub_file, sub_sub_file)
                if not os.path.isdir(bottom_file_path):
                    continue
                bottom_files = os.listdir(bottom_file_path)
                for bottom_file in bottom_files:
                    path_to_zip_file = os.path.join(path,file,sub_file, sub_sub_file, bottom_file)
                    if zipfile.is_zipfile(path_to_zip_file):
                        #print(bottom_file)
                        directory_to_extract_to = path_to_zip_file[:-4]
                        with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
                            zip_ref.extractall(directory_to_extract_to)
                        parse_file(directory_to_extract_to) # parse the fit files to csv files
        

read_file_studio("/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio")

In [54]:
def read_file_wsp(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            bottom_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(bottom_file_path):
                continue
            bottom_files = os.listdir(bottom_file_path)
            for bottom_file in bottom_files:
                path_to_zip_file = os.path.join(path,file,sub_file, bottom_file)
                if zipfile.is_zipfile(path_to_zip_file):
                    directory_to_extract_to = path_to_zip_file[:-4]
                    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
                        zip_ref.extractall(directory_to_extract_to)
                    parse_file(directory_to_extract_to) # parse the fit files to csv files
read_file_wsp("/Users/luerlyu/Desktop/Experimental Data/Physiological data/WSP")

No zip file for the following file

In [55]:
parse_file('/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio/F/Afro/1111/2022-11-11')

## parse eda by specific time

In [2]:
def to_date(unix):
    dt = datetime.datetime.fromtimestamp(unix/1e3)
    return dt.replace(microsecond=0)

In [4]:
def select_time_eda(path):
    eda_df = pd.read_csv(path)
    eda_df.rename(columns = {"Unix Timestamp (UTC)":"time"}, inplace = True)
    eda_df["time"] = eda_df["time"].map(to_date)
    st_time = eda_df["time"].iloc[0]
    end_time = eda_df["time"].iloc[-1]
    #make a 10 min interval
    interval_df = pd.DataFrame({"time":pd.date_range(st_time, end_time, freq='10T')})
    res_df = eda_df.merge(interval_df, on="time")
    res_df = res_df.groupby(["time"]).mean()
    path_to_new_file = path[:-4]+"_processed.csv"
    res_df.to_csv(path_to_new_file, index="False")

In [5]:
def select_time_temp(path):
    temp_df = pd.read_csv(path)
    temp_df.rename(columns = {"Unix Timestamp (UTC)":"time"}, inplace = True)
    try:
        temp_df["time"] = temp_df["time"].map(to_date)
    except:
        print(path)
        return
    
    
    res_df = temp_df.groupby(pd.Grouper(key="time", freq="10min")).mean()
    path_to_new_file = path[:-4]+"_processed.csv"
    res_df.to_csv(path_to_new_file, index="False")

In [145]:
def select_time_studio(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            sub_sub_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(sub_sub_file_path):
                continue
            sub_sub_files = os.listdir(sub_sub_file_path)
            for sub_sub_file in sub_sub_files:
                
                bottom_file_path = os.path.join(path,file,sub_file, sub_sub_file)
                if not os.path.isdir(bottom_file_path):
                    continue
                bottom_files = os.listdir(bottom_file_path)
                for bottom_file in bottom_files:
                    if bottom_file == "eda.csv":
                        path_to_eda_file = os.path.join(path,file,sub_file, sub_sub_file, bottom_file)
                        select_time_eda(path_to_eda_file)
                    elif bottom_file == "temp.csv":
                        path_to_temp_file = os.path.join(path,file,sub_file, sub_sub_file, bottom_file)
                        select_time_temp(path_to_temp_file)
        

select_time_studio("/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio")

/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio/F/Afro/1111/temp.csv


'Experimental Data/Physiological data/Studio/F/Afro/1111/temp.csv' was processed in different wa

In [146]:
def select_time_wsp(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            bottom_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(bottom_file_path):
                continue
            bottom_files = os.listdir(bottom_file_path)
            for bottom_file in bottom_files:
                if bottom_file == "eda.csv":
                    path_to_eda_file = os.path.join(path,file,sub_file,  bottom_file)
                    select_time_eda(path_to_eda_file)
                elif bottom_file == "temp.csv":
                    path_to_temp_file = os.path.join(path,file,sub_file, bottom_file)
                    select_time_df(path_to_temp_file)
select_time_wsp("/Users/luerlyu/Desktop/Experimental Data/Physiological data/WSP")

## handle heart rate time stamp

In [58]:
def handle_time(path):
    files = os.listdir(path)
    csv_files = [file for file in files if file[-4:].lower()=='.csv' and re.findall(r"WELLNESS\Z", file[:-4])]
    #for each heart rate entry, replace with the prev timestamp
    for csv in csv_files:
        df = pd.read_csv(os.path.join(path,csv))
        # if processed, skip
        if len(df.columns) ==3:
            continue
        for i in range(1,len(df)):
            try:
                k = pd.isnull(df.loc[i, 'heart_rate'])
            except:
                print(path, csv)
                return 
            if not pd.isnull(df.loc[i, 'heart_rate']):
                idx = i
                while idx>=1 and pd.isnull(df.loc[idx-1, 'stress_level_value']):
                    idx-=1
                df.loc[idx-1, 'heart_rate'] = df.loc[i, 'heart_rate']
                #df=df.drop(df.index[i])
                df.loc[i, 'heart_rate'] = np.nan
            
        df = df.rename(columns={"stress_level_time": "timestamp"})
        df = df.drop(['timestamp_16'], axis=1)
        df.to_csv(os.path.join(path,csv), index=False)

In [59]:
def handle_file_studio(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            sub_sub_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(sub_sub_file_path):
                continue
            sub_sub_files = os.listdir(sub_sub_file_path)
            for sub_sub_file in sub_sub_files:
                
                bottom_file_path = os.path.join(path,file,sub_file, sub_sub_file)
                if not os.path.isdir(bottom_file_path):
                    continue
                bottom_files = os.listdir(bottom_file_path)
                for bottom_file in bottom_files:
                    path_to_unzipped_file = os.path.join(path,file,sub_file, sub_sub_file, bottom_file)
                    if os.path.isdir(path_to_unzipped_file):
                        handle_time(path_to_unzipped_file) # combine the csv files
        

handle_file_studio("/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio")

In [60]:
def handle_file_wsp(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            bottom_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(bottom_file_path):
                continue
            bottom_files = os.listdir(bottom_file_path)
            for bottom_file in bottom_files:
                path_to_unzipped_file = os.path.join(path,file,sub_file, bottom_file)
                if os.path.isdir(path_to_unzipped_file):
                    handle_time(path_to_unzipped_file) # combine the csv files
handle_file_wsp("/Users/luerlyu/Desktop/Experimental Data/Physiological data/WSP")

## combine the wellness csv files into "WELLNESS_combined.csv"

In [61]:
def combine_file(path):
    files = os.listdir(path)
    csv_files = [file for file in files if file[-4:].lower()=='.csv' and re.findall(r"WELLNESS\Z", file[:-4])]
    combined_df = pd.DataFrame(columns=['heart_rate','timestamp','stress_level_value'])
    for csv in csv_files:
        df = pd.read_csv(os.path.join(path,csv))
        df.dropna(how='all',inplace=True) #removing empty cells
        combined_df = pd.concat([combined_df, df])
    #combined_df['stress_level_time'] = combined_df['stress_level_time'].astype('Int64')
    combined_df.drop(combined_df[(combined_df['stress_level_value'] <=0)].index, inplace=True)
    combined_df = combined_df.sort_values(by=['timestamp'], ascending=True)
    combined_df.to_csv(os.path.join(path,'WELLNESS_combined.csv'), index=False)
    

In [62]:
def combine_file_studio(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            sub_sub_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(sub_sub_file_path):
                continue
            sub_sub_files = os.listdir(sub_sub_file_path)
            for sub_sub_file in sub_sub_files:
                
                bottom_file_path = os.path.join(path,file,sub_file, sub_sub_file)
                if not os.path.isdir(bottom_file_path):
                    continue
                bottom_files = os.listdir(bottom_file_path)
                for bottom_file in bottom_files:
                    path_to_unzipped_file = os.path.join(path,file,sub_file, sub_sub_file, bottom_file)
                    if os.path.isdir(path_to_unzipped_file):
                        combine_file(path_to_unzipped_file) # combine the csv files
        

combine_file_studio("/Users/luerlyu/Desktop/Experimental Data/Physiological data/Studio")

In [63]:
def combine_file_wsp(path):
    files = os.listdir(path)
    for file in files:
        sub_file_path = os.path.join(path,file)
        if not os.path.isdir(sub_file_path):
            continue
        sub_files = os.listdir(sub_file_path)
        for sub_file in sub_files:
            bottom_file_path = os.path.join(path,file,sub_file)
            if not os.path.isdir(bottom_file_path):
                continue
            bottom_files = os.listdir(bottom_file_path)
            for bottom_file in bottom_files:
                path_to_unzipped_file = os.path.join(path,file,sub_file, bottom_file)
                if os.path.isdir(path_to_unzipped_file):
                    combine_file(path_to_unzipped_file) # combine the csv files
combine_file_wsp("/Users/luerlyu/Desktop/Experimental Data/Physiological data/WSP")