# Database - functions for data back-end / manipulations

This is using an alternate approach:
  - Export all of my Apple HealthFit data from the Health app to export.zip 
  - Converted this to a SQLite database using `healthfit-to-sqlite`
  - Run various SQL queries to allow for producing a summary of (walk/hike) workouts
 
  Queries can then against this database to build the cache file (or possibly a smaller custom SQLite file) as input into the walk mapping app.


In [1]:
import pandas as pd
import datetime as dt
from pathlib import Path
import subprocess
import pendulum
from sqlite_utils import Database
import reverse_geocoder as rg


#### Exporting HealthKit data / creating SQLite DB

First export HealthKit data using the Health app - select your profile icon from the top-right of the main screen and then select **Export All Health Data** (this can take some time to create the `export.zip` file).

The archive can be converted to a SQLite database using the following command:

`healthkit-to-sqlite export.zip healthkit_db.sqlite`

which requires the `healthkit-to-sqlite` library to be installed (note it is one of the requirements).

In [2]:
HEALTHKIT_DATA_PATH = "/Users/mjboothaus/icloud/Data/apple_health_export"
export_zip = Path(HEALTHKIT_DATA_PATH) / "export.zip"


In [11]:
export_zip.as_posix()

'/Users/mjboothaus/icloud/Data/apple_health_export/export.zip'

In [3]:
def get_location(latitude, longitude):
    location = rg.search((latitude, longitude))
    return [location[0]["name"], location[0]["admin1"], location[0]["cc"]]


In [4]:
def calculate_elapsed_time_minutes(finish_datetime, start_datetime):
    dt = pendulum.parse(finish_datetime) - pendulum.parse(start_datetime)
    return float(dt.in_seconds() / 60 / 60)


In [5]:
def convert_healthkit_export_to_sqlite(export_zip):
    zip_file = export_zip.as_posix()
    if export_zip.exists() is False:
        print(zip_file, ": not found")
        return None, f"{zip_file}: not found"
    zip_file_date = pendulum.instance(
        dt.datetime.fromtimestamp(export_zip.stat().st_ctime)
    )

    db_file = zip_file.replace("export.zip", "healthkit_db.sqlite")
    if Path(db_file).exists() is True:
        Path(db_file).unlink()
    sp_cmd = f"healthkit-to-sqlite {zip_file} {db_file}"

    sp = subprocess.Popen(sp_cmd, stdout=subprocess.PIPE, shell=True)
    (sp_output, _) = sp.communicate()

    # This makes the wait possible
    # sp_status = sp.wait()

    db_file_with_date = db_file.replace(
        ".sqlite", "_" + zip_file_date.to_date_string().replace("-", "_") + ".sqlite"
    )

    export_zip.rename(
        zip_file.replace(
            ".zip", "_" + zip_file_date.to_date_string().replace("-", "_") + ".zip"
        )
    )
    Path(db_file).rename(db_file_with_date)

    return db_file_with_date, sp_output


In [6]:
def create_df_from_sql_query_in_file(
    filename_dot_sql, conn, parse_dates, echo_query=False
):

    query_file = Path.cwd().parent / "sql" / filename_dot_sql

    with open(query_file, "r") as query:
        sql_text = query.read()
        if echo_query is True:
            print(sql_text)
        df = pd.read_sql_query(sql_text, conn, parse_dates=parse_dates)
    return df


In [7]:
def create_walk_workout_summary(
    db_file, output_file="../data/workouts_summary.csv", include_location=False
):
    if db_file is None or Path(db_file).exists() is False:
        print("SQLite database doesn't exist or not found")
        return None
    db = Database(db_file)

    # Extract data

    workouts_df = create_df_from_sql_query_in_file(
        "select_star_walking_workouts.sql", db.conn, ["startDate", "endDate"]
    )
    start_point_df = create_df_from_sql_query_in_file(
        "select_start_point_workout.sql", db.conn, ["date"]
    )
    finish_point_df = create_df_from_sql_query_in_file(
        "select_finish_point_workout.sql", db.conn, ["date"]
    )

    # Perform joins and additional column manipulations

    workouts_df["startDate"] = workouts_df["startDate"].apply(
        lambda dt: pendulum.instance(dt).to_datetime_string()
    )
    workouts_df["endDate"] = workouts_df["endDate"].apply(
        lambda dt: pendulum.instance(dt).to_datetime_string()
    )
    workouts_summary_df = start_point_df.merge(
        finish_point_df, how="inner", on="workout_id"
    )
    workouts_summary_df["elapsed_time_hours"] = workouts_summary_df.apply(
        lambda row: calculate_elapsed_time_minutes(
            row["finish_datetime"], row["start_datetime"]
        ),
        axis=1,
    )
    workouts_summary_df["start_datetime"] = workouts_summary_df["start_datetime"].apply(
        lambda dt: pendulum.parse(dt, tz="Australia/Sydney").to_datetime_string()
    )

    if include_location is True:
        workouts_summary_df["start_location"] = workouts_summary_df.apply(
            lambda row: get_location(
                float(row["start_latitude"]), float(row["start_longitude"])
            ),
            axis=1,
        )
        workouts_summary_df["finish_location"] = workouts_summary_df.apply(
            lambda row: get_location(
                float(row["finish_latitude"]), float(row["finish_longitude"])
            ),
            axis=1,
        )
    workouts_summary_df = workouts_summary_df.merge(
        workouts_df, how="inner", on="workout_id"
    )

    workouts_summary_df.to_csv(output_file, index=False)
    return Path(output_file)
    

In [8]:
def main_convert_create_walk_summary(path_export_zip, include_location=False):
    db_file, _ = convert_healthkit_export_to_sqlite(path_export_zip)
    output_file = create_walk_workout_summary(db_file, include_location=include_location)
    return db_file, output_file

In [9]:
db_file, output_file = main_convert_create_walk_summary(export_zip, include_location=False)

Traceback (most recent call last):
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/bin/healthkit-to-sqlite", line 8, in <module>
    sys.exit(cli())
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/lib/python3.9/site-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/lib/python3.9/site-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/Users/mjboothaus/code/github/mjboothaus/emmaus-walking-data/.venv/lib/python3.9/site-packages/healthki

DatabaseError: Execution failed on sql 'select
    id as workout_id,
    duration as duration_minutes,
    totaldistance as totaldistance_km,
    totalenergyburned as totalenergyburned_kJ,
    sourcename,
    sourceversion,
    startdate,
    enddate,
    metadata_hkweathertemperature,
    metadata_hkweatherhumidity,
    metadata_hkelevationascended,
    metadata_hkaveragemets
from
    workouts
where workoutactivitytype = "HKWorkoutActivityTypeWalking" or workoutactivitytype = "HKWorkoutActivityTypeHiking" order by id


 /* Excluded fields:
    workoutactivitytype,   # just walking
    durationunit,          # fixed - min
    totaldistanceunit,     # fixed - km
    totalenergyburnedunit, # fixed - kJ
    device,
    creationdate,          # not really of interest (start date instead)
    workout_events,        # think this is redundant info (need to check - JSON?)
    metadata_hkgroupfitness,
    metadata_hkworkoutbrandname,
    metadata_hktimezone,
    metadata_hkcoachedworkout,
    metadata_hkwasuserentered,
    metadata_hkindoorworkout,
    metadata_hkelevationascended,
    metadata_hkswimminglocationtype,
    metadata_hkaveragemets,
    metadata_healthfit_sub_sport,
    metadata_healthfit_route,
    metadata_healthfit_file_type,
    metadata_hkmaximumspeed,
    metadata_healthfit_total_moving_time,
    metadata_healthfit_total_distance,
    metadata_healthfit_max_running_cadence,
    metadata_healthfit_min_altitude,
    metadata_healthfit_avg_running_cadence,
    metadata_healthfit_app_build,
    metadata_healthfit_fit_sport,
    metadata_healthfit_fit_sub_sport,
    metadata_healthfit_max_altitude,
    metadata_healthfit_fit_manufacturer,
    metadata_healthfit_total_strides,
    metadata_healthfit_fit_serial_number,
    metadata_healthfit_sport,
    metadata_hkaveragespeed,
    metadata_healthfit_app_version,
    metadata_hkexternaluuid
 */
': no such table: workouts

In [None]:
db_file

In [None]:
output_file

In [None]:
def calc_walk_stats(walk_data):
    total_time = dt.timedelta(0)
    total_distance = 0

    for hike in walk_data:
        total_time += hike.index.max()
        # print(iHike+1, walk_date[iHike], hike.index.max(), hike['dist'].max() / 1e3)
        total_distance += hike["dist"].max()
    total_distance /= 1e3

    start_coord = walk_data[0][["lat", "lon"]].iloc[0].tolist()
    end_coord = walk_data[-1][["lat", "lon"]].iloc[-1].tolist()
    return total_time, total_distance, start_coord, end_coord


In [None]:
def create_walk_cached_data_for_app(db_file, n_rows_used=5):
    # read in all of the walks data and sample at an appropriate frequency and cache for faster use in the app
    db = Database(db_file)
    walk_df = pd.read_sql_query("SELECT * FROM walks", db.conn)

    UNUSED_COLUMNS = ["dist", "speed"]

    walk_df.drop(UNUSED_COLUMNS, axis=1, inplace=True)
    walk_df.dropna(inplace=True)  # TODO: Check why there are a few NaNs
    walk_df = walk_df.iloc[::n_rows_used].reset_index()  # downsample

    walk_df.to_feather(Path(db_file.as_posix().replace(".db", ".cache.feather")))

    return walk_df
