UNZIP APPLE HEALTH EXPORT

In [None]:
import zipfile
with zipfile.ZipFile('export.zip', 'r') as zip_ref:
    zip_ref.extractall()

IMPORT LIBRARIES

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
# plt.style.use("fivethirtyeight")
plt.style.use('plot-style.mplstyle')

import seaborn as sns



In [None]:
# create element tree object
tree = ET.parse('apple_health_export/Export.xml') 
# for every health record, extract the attributes
root = tree.getroot()
record_list = [x.attrib for x in root.iter('Record')]

In [None]:
import sys

In [None]:
record_data = pd.DataFrame(record_list)

# proper type to dates
for col in ['creationDate', 'startDate', 'endDate']:
    record_data[col] = pd.to_datetime(record_data[col])

# value is numeric, NaN if fails
record_data['value'] = pd.to_numeric(record_data['value'], errors='coerce')

# some records do not measure anything, just count occurences
# filling with 1.0 (= one time) makes it easier to aggregate
record_data['value'] = record_data['value'].fillna(1.0)

# shorter observation names
record_data['type'] = record_data['type'].str.replace('HKQuantityTypeIdentifier', '')
record_data['type'] = record_data['type'].str.replace('HKCategoryTypeIdentifier', '')
record_data.tail()

In [None]:
#add time length for each measurement. add date field.
record_data['measure_time_secs'] = (record_data['endDate'] - record_data['startDate']).dt.total_seconds()
record_data['day'] = record_data['startDate'].dt.date

# Create new values field for each measurement type
rec_types = record_data.sort_values('type').type.unique()

pivoted_rectypes = pd.concat([
    record_data.query(f"type == '{x}'")['value'].rename(x) for x in rec_types
], axis=1)

record_data = pd.concat([record_data,pivoted_rectypes],axis=1)
record_data.drop(columns=['value', 'type'],inplace=True)

#export to file
record_data.to_parquet('record_data.parquet',index=False)

record_data.head(2)


In [None]:
record_data.shape

In [None]:
workout_list = [x.attrib for x in root.iter('Workout')]

# create DataFrame
workout_data = pd.DataFrame(workout_list)
workout_data['workoutActivityType'] = workout_data['workoutActivityType'].str.replace('HKWorkoutActivityType', '')
workout_data = workout_data.rename({"workoutActivityType": "Type"}, axis=1)

# proper type to dates
for col in ['creationDate', 'startDate', 'endDate']:
    workout_data[col] = pd.to_datetime(workout_data[col])
 
workout_data['day'] = workout_data.startDate.dt.date

# convert string to numeric   
workout_data['duration'] = pd.to_numeric(workout_data['duration'])
workout_data['totalEnergyBurned'] = pd.to_numeric(workout_data['totalEnergyBurned'])
workout_data['totalDistance'] = pd.to_numeric(workout_data['totalDistance'])

workout_data['Type'] = workout_data['Type'].str.replace('Traditional','')
workout_data.rename(columns={'Type':'workoutType'},inplace=True)

workout_data.to_parquet('workout_data.parquet',index=False)

workout_data.tail(2)

# Sleep HRV DATA

Heart Rate Variability measurements are supposedly more accurate during sleep. Therefore below filters HRV measurements using the Sleep data from the Apple Watch to HRV during sleep only.

In [None]:
record_data['date'] = pd.to_datetime(record_data['day'], yearfirst=True)

# Get Sleep Times
sleep_data = record_data.query("SleepAnalysis ==1").query("sourceName == 'George’s Apple\xa0Watch'")
when_asleep = pd.concat([
    sleep_data.groupby('date')['startDate'].min().rename('sleep_start')
    , sleep_data.groupby('date')['endDate'].max().rename('sleep_end')
], axis=1)

# record_data[['startDate','endDate','day','HeartRateVariabilitySDNN']] 
hrv_data = record_data.dropna(subset='HeartRateVariabilitySDNN')[['startDate','endDate','date','HeartRateVariabilitySDNN']]

merged_hrv_sleep = hrv_data.merge(
    right=when_asleep,
    on='date'
)

hrv_asleep = merged_hrv_sleep[
    (
        merged_hrv_sleep.startDate > merged_hrv_sleep.sleep_start
    ) & (
        merged_hrv_sleep.endDate < merged_hrv_sleep.sleep_end
    )
]
hrv_asleep

# GROUPING ETC 

In [None]:
record_data.sourceName.unique()

In [None]:
record_data['type'].unique()

In [None]:
record_data.sourceName.unique()

In [None]:
daily_agg = record_data.groupby(['day','type'],dropna=False
                               ).agg({'value':['sum','min','max','count','mean']}).reset_index()

In [None]:
record_data[record_data['type'] == 'HighHeartRateEvent']

In [None]:
rec_types = record_data.sort_values('type').type.unique()

pivoted_rectypes = pd.concat([
    record_data.query(f"type == '{x}'")['value'].rename(x) for x in rec_types
], axis=1)

record_data2 = pd.concat([record_data,pivoted_rectypes],axis=1)
record_data2.to_parquet('record_data.parquet',index=False)
record_data2.head(2)

MEAN is the same as dropping NA before calculating mean - so avgs will be consistent. 
**DO NOT FILL NA with zeros or all your daily/hrly avgs will be off.**

In [None]:

print(pivoted_rectypes['ActiveEnergyBurned'].mean())
print(pivoted_rectypes['ActiveEnergyBurned'].dropna().mean())



In [None]:
agg_types = {'BodyMass':'mean',
'LeanBodyMass':'mean',
'BodyFatPercentage':'mean',
'OxygenSaturation':'mean',
'AppleStandHour':'sum',
'EnvironmentalAudioExposure':'mean',
'HeadphoneAudioExposure':'mean',
'VO2Max':'mean',
'HeartRate':'mean',
'RestingHeartRate':'mean',
'WalkingHeartRateAverage':'mean',
'BloodPressureSystolic':'mean',
'BloodPressureDiastolic':'mean',
'HeartRateVariabilitySDNN':'mean',
'StepCount':'sum',
'AppleStandTime':'sum',
'SleepAnalysis':'sum',
'HighHeartRateEvent':'sum',
'AudioExposureEvent':'sum',
'HeadphoneAudioExposureEvent':'sum'}


In [None]:
# record_data['day'] = record_data['startDate'].dt.date
# record_data.to_parquet('full-neaten-health.parquet')

In [None]:
daily_data = record_data.groupby('day').agg(agg_types)

In [None]:
daily_data.to_csv('daily_health_aggregate.csv')
daily_data.to_parquet('daily_health_aggregate.parquet')

In [None]:
daily_data['HeartRateVariabilitySDNN'].tail(365).rolling(window=28).mean().plot()

In [None]:
daily_data['StepCount'].rolling(window=28).mean().plot()

In [None]:
record_data.to_parquet('full-neaten-health.parquet')

In [None]:
daily_agg = {}
for a in agg_types:
    d = record_data[record_data['type'] == a].copy()
    d_daily = d.groupby('day').agg({'value':agg_types[a]})
    filter_since_apple_watch = d_daily.loc[pd.Timestamp('2021-07-01').date():]
    
    daily_agg[a] = filter_since_apple_watch
    
daily_df = pd.concat([v.rename(columns={'value':k}) for k,v in daily_agg.items()],axis=1)
daily_df.head(2)

In [None]:
daily_df.to_csv('daily_health_agg_new.csv')

In [None]:
daily_df

# workout specific

In [None]:
# import xml.etree.ElementTree as ET
# # create element tree object
# tree = ET.parse('apple_health_export/Export.xml') 
# # for every health record, extract the attributes
# root = tree.getroot()

In [None]:
workout_list = [x.attrib for x in root.iter('Workout')]

# create DataFrame
workout_data = pd.DataFrame(workout_list)
workout_data['workoutActivityType'] = workout_data['workoutActivityType'].str.replace('HKWorkoutActivityType', '')
workout_data = workout_data.rename({"workoutActivityType": "Type"}, axis=1)

# proper type to dates
for col in ['creationDate', 'startDate', 'endDate']:
    workout_data[col] = pd.to_datetime(workout_data[col])
 
# convert string to numeric   
workout_data['duration'] = pd.to_numeric(workout_data['duration'])
workout_data['totalEnergyBurned'] = pd.to_numeric(workout_data['totalEnergyBurned'])
workout_data['totalDistance'] = pd.to_numeric(workout_data['totalDistance'])
workout_data.tail()

In [None]:
num_workouts = workout_data.shape[0]

In [None]:
def get_workouts(df, workout_type):
    return df[df["Type"] == workout_type]

# to see the different workout types use:
print(workout_data.Type.unique())
# ['Running' 'FunctionalStrengthTraining' 'Yoga'
#  'HighIntensityIntervalTraining' 'CoreTraining']

running_data = get_workouts(workout_data, "Running")

In [None]:
def get_workouts_from_to(df, start, end):
    start = pd.to_datetime(start, utc=True)
    end = pd.to_datetime(end, utc=True)
    workouts = df[df["creationDate"] >= start]
    workouts = workouts[workouts["creationDate"] <= end]
    return workouts

lower_time = dt.date(2022, 1, 1)
upper_time = dt.date(2023, 1, 1)
workouts = get_workouts_from_to(workout_data, lower_time, upper_time)

# or relative to the current day
# today = dt.date.today()
# xdaysago = today - dt.timedelta(days=7)
# # first_of_month = today - dt.timedelta(days=today.day - 1)
# workouts = get_workouts_from_to(workout_data, xdaysago, today)

In [None]:
workouts.groupby('Type')[['duration','totalDistance','totalEnergyBurned']].describe()

In [None]:
last_workout = workouts.iloc[[-1]]

In [None]:
workouts[workouts.Type == 'Running']

In [None]:
workouts_data_hr = {}
for w in workouts['heartrate'].values:
    rel_time = (w['startDate'] - w['startDate'].iloc[0])
    w['value']
    
    workouts_data_hr[w['startDate'].iloc[0]] = pd.concat([rel_time, w['value']] , axis=1)

In [None]:
fig,ax = plt.subplots()

for w in workouts_data_hr.values():
    ax.plot(w)
    
fig.tight_layout()
fig.show()

In [None]:
plt.plot(workouts['heartrate'][0]['value'].values)

In [None]:
workouts

In [None]:
def get_heartrate_for_workout(heartrate, workout):
    def get_heartrate_for_date(hr, start, end):
        hr = hr[hr["startDate"] >= start]
        hr = hr[hr["endDate"] <= end]
        return hr
    return get_heartrate_for_date(heartrate, workout["startDate"].item(), workout["endDate"].item())

heartrate_data = record_data[record_data["type"] == "HeartRate"]

# Extract heartrate statistics for certain workout
last_workout = workouts.iloc[[-1]]
heartrate_workout = get_heartrate_for_workout(heartrate_data, last_workout)
minh = heartrate_workout["value"].min()
maxh = heartrate_workout["value"].max()
meanh = heartrate_workout["value"].mean()
print(last_workout.Type.item(), minh, maxh, meanh)
# HighIntensityIntervalTraining 74.0 176.0 151.2590909090909


In [None]:
heartrate_workout.plot(x='endDate', y='value', style='r|', markersize=8.5,  figsize=(12, 6))

In [None]:
def get_hr_for_workout_row(workout, heartrate):
    def get_hr_for_date(hr, start, end):
        hr = hr[hr["startDate"] >= start]
        hr = hr[hr["endDate"] <= end]
        return hr
    return get_hr_for_date(heartrate, workout["startDate"], workout["endDate"])

def convert_to_minute_proportion(number):
    return int(number) + ((number % 1) / 100 * 60)

def get_pace_for_workout(workout):
    if workout["totalDistance"] == 0.0:
        return 0.0
    # pace=min/km
    pace = workout["duration"] / workout["totalDistance"]
    return convert_to_minute_proportion(pace)


workouts["heartrate"] = workouts.apply(lambda row: get_hr_for_workout_row(row, heartrate_data), axis=1)
workouts["hr_mean"] = workouts.apply(lambda row: row['heartrate']["value"].mean(), axis=1)
workouts["pace"] = workouts.apply(lambda row: get_pace_for_workout(row), axis=1)

In [None]:
def get_stats(workouts):
    total_kcal = workouts["totalEnergyBurned"].sum()
    total_dist = workouts["totalDistance"].sum()
    total_time = workouts["duration"].sum()
    total_time_hours = convert_to_minute_proportion(total_time / 60)
    total_time_mins = convert_to_minute_proportion(total_time)
 
    avg_kcal = workouts["totalEnergyBurned"].mean()
    avg_dist = workouts[workouts["Type"] == "Running"]["totalDistance"].mean()
    avg_pace = workouts[workouts["Type"] == "Running"]["pace"].mean()
    avg_time = workouts["duration"].mean()
    avg_time_hours = convert_to_minute_proportion(avg_time / 60)
    avg_time_mins = convert_to_minute_proportion(avg_time)
    
    print(f"Workout statistics from {lower_time} to {upper_time-dt.timedelta(days=1)}")
    print(f"{workouts.shape[0]} workouts")
    print(f"Time: {total_time_mins:.2f} minutes ({total_time_hours:.2f} hours)\nCalories burned: {total_kcal:.2f}kcal\nRunning distance: {total_dist:.2f}km")
    print("\nAverage per workout:")
    print(f"Time: {avg_time_mins:.2f} minutes ({avg_time_hours:.2f} hours)\nCalories burned: {avg_kcal:.2f}kcal\nRunning distance: {avg_dist:.2f}km\nRunning pace: {avg_pace:.2f}km/h")

get_stats(workouts)

In [None]:
def plot_workouts(workouts):
    labels = []
    slices = []
    for wo_type in workouts.Type.unique():
        labels.append(wo_type)
        wo_of_type = workouts[workouts["Type"] == wo_type]
        num_workouts_of_type = wo_of_type.shape[0]
        slices.append(num_workouts_of_type)
    
    def make_autopct(values):
        def my_autopct(pct):
            total = sum(values)
            val = int(round(pct*total/100.0))
            return '{p:.2f}%  ({v:d})'.format(p=pct,v=val)
        return my_autopct

    plt.figure(figsize=(10, 10))
    plt.pie(slices, labels=labels, shadow=True,
            startangle=90, autopct=make_autopct(slices),
            wedgeprops={'edgecolor': 'black'})

    plt.title("Workouts in 2021")
    plt.tight_layout()
    plt.show()

plot_workouts(workouts)