# Notes

**Filename format example**

ptsc-health-data-prod/raw/health/2022/06/30/FITBIT/DEVICE_DETAILS/DEVICE/DEVICE/*

# Set Up

## Modules and Client

In [None]:
import warnings
warnings.filterwarnings("ignore", "Your application has authenticated using end user credentials")

In [None]:
from datetime import datetime, timedelta
from calendar import monthrange
from google.cloud import bigquery
import pandas as pd
client = bigquery.Client(project='aou-res-curation-prod')

## String variables

In [None]:
device_uris = "gs://ptsc-health-data-prod/raw/health/{yyyy}/{mm}/{dd}/FITBIT/DEVICE_DETAILS/DEVICE/DEVICE/*"

In [None]:
yesterday1 = datetime.today() - timedelta(days=1)

#yesterday = str(yesterday1.date())
yesterday_year = str(yesterday1.year)
yesterday_month = str(yesterday1.month)
yesterday_day = str(yesterday1.day)
yesterday_year, yesterday_month, yesterday_day

In [None]:
cur_project_id = 'aou-res-curation-prod'
dataset_name = 'fitbit_ingest' #dataset_name = 'fitbit_ingest'
main_table = 'device'

print(main_table)

federated_table_name = f'dev_{main_table}'
print('\n'+federated_table_name)

federated_table_id = f"{cur_project_id}.{dataset_name}.dev_{main_table}"
print('\n'+federated_table_id)

staging_table_name = f"staging_{main_table}"
print('\n'+staging_table_name)

staging_table_id = f"{cur_project_id}.{dataset_name}.{staging_table_name}"
print('\n'+staging_table_id)

destination_table_id = f"{cur_project_id}.{dataset_name}.{main_table}"
print('\n'+destination_table_id)
#print(f"Uploading {cat} to **{staging_table_id}**")


## Functions

In [None]:
def create_federated_table(dataset_name, federated_table_name, uris, project_id=cur_project_id):
    bq_client = bigquery.Client(project=project_id)
    table_ref = bq_client.dataset(dataset_name).table(federated_table_name)
    table = bigquery.Table(table_ref)
    extconfig = bigquery.ExternalConfig('CSV')
    extconfig.schema = [bigquery.SchemaField('data', 'string')]
    extconfig.options.autodetect = False
    extconfig.options.field_delimiter = '|' #u'\u00ff'
    extconfig.options.allow_jagged_rows = True
    extconfig.options.ignore_unknown_values = True
    extconfig.options.allow_quoted_newlines = True
    extconfig.max_bad_records = 0
    extconfig.source_uris = uris
    table.external_data_configuration = extconfig
    bq_client.delete_table(table, not_found_ok=True) 
    bq_client.create_table(table)

In [None]:
def delete_table(table, project_id = cur_project_id, dataset_name = dataset_name):
    bq_client = bigquery.Client(project=project_id)
    table_ref = bq_client.dataset(dataset_name).table(table)
    table = bigquery.Table(table_ref)
    bq_client.delete_table(table, not_found_ok=True) 

In [None]:
def parse_table_to_bigquery(staging_table_id, destination_table_id, project_id = cur_project_id):
    
    # Construct a BigQuery client object.
    client = bigquery.Client(project=project_id)
    
    job_config = bigquery.LoadJobConfig( 
                schema=[
                    bigquery.SchemaField("vibrent_id", bigquery.enums.SqlTypeNames.INTEGER),
                    bigquery.SchemaField("device_id", bigquery.enums.SqlTypeNames.STRING),
                    bigquery.SchemaField("upload_date", bigquery.enums.SqlTypeNames.DATE),
                    bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE),
                    bigquery.SchemaField("battery", bigquery.enums.SqlTypeNames.STRING),
                    bigquery.SchemaField("battery_level", bigquery.enums.SqlTypeNames.FLOAT),
                    bigquery.SchemaField("device_version", bigquery.enums.SqlTypeNames.STRING),
                    bigquery.SchemaField("device_type", bigquery.enums.SqlTypeNames.STRING),
                    bigquery.SchemaField("last_sync_time", bigquery.enums.SqlTypeNames.DATETIME)
                    ]
        
                 , write_disposition="WRITE_APPEND")

    # does not include the fields 'features' and 'mac' 
    #- they are not in the agred upon schema and they are either empty or not useful
    sql_query = f"""
    SELECT DISTINCT
        SAFE_CAST(REGEXP_EXTRACT(filename, 'DEVICE/DEVICE/([0-9]+)/') AS INT64) AS vibrent_id,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.id") AS STRING) as device_id,
        DATE(SAFE_CAST(REPLACE(REGEXP_EXTRACT(filename, 'health/([0-9]{{4}}/[0-9]{{2}}/[0-9]{{2}})/')
                                            , '/', '-') AS DATE)) as upload_date,    
        SAFE_CAST(REGEXP_EXTRACT(filename, '/([0-9]{{4}}-[0-9]{{2}}-[0-9]{{2}})[T|-]?') AS DATE) as date,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.battery") AS STRING) as battery,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.batteryLevel") AS FLOAT64) as battery_level,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.deviceVersion") AS STRING) as device_version,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.lastSyncTime") AS DATETIME) as last_sync_time,
        SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.type") AS STRING) as device_type,
        
        #SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.features") AS STRING) as features,
        #SAFE_CAST(JSON_EXTRACT_SCALAR(unnested_data, "$.mac") AS STRING) as mac,
    
    FROM `{staging_table_id}` 
    , UNNEST(JSON_EXTRACT_ARRAY(data, "$.")) AS unnested_data """

    # Start the query, passing in the extra configuration.
    dataframe = pd.read_gbq(sql_query)
    dataframe['upload_date'] = [datetime.date(i) for i in dataframe['upload_date']]
    job = client.load_table_from_dataframe(dataframe, destination_table_id, job_config = job_config)  # Make an API request.
    return job.result()  # Wait for the job to complete.

In [None]:
def load_and_parse_table_to_bigquery(federated_table_id, staging_table_id, destination_table_id
                                     , project_id = cur_project_id):
    # combibation of load_federated_table_to_bigquery() and parse_table_to_bigquery() 
    
    # 1. load_federated_table_to_bigquery 
    ## Construct a BigQuery client object.
    client = bigquery.Client(project=project_id)
    
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("data", bigquery.enums.SqlTypeNames.STRING),
            bigquery.SchemaField("filename", bigquery.enums.SqlTypeNames.STRING)]
        , write_disposition="WRITE_TRUNCATE",
            )

    sql = f"""
        SELECT
            *,
            _FILE_NAME as filename
        FROM `{federated_table_id}`
    """
    # Start the query, passing in the extra configuration.
    dataframe = pd.read_gbq(sql)
    if dataframe.empty == False: #avoids error message when there is no json file for that upload date
        
        job = client.load_table_from_dataframe(dataframe, staging_table_id, job_config = job_config)  # Make an API request.
        res = job.result() # Make an API request.
        
        ## 2 - If there is data, parse_table_to_bigquery
        parse_table_to_bigquery(staging_table_id, destination_table_id, project_id = cur_project_id)
        
    else:
        res = 'no data'
        print(res)
    return res 

In [None]:
months_of_year = ['01','02','03','04','05','06','07','08','09','10','11','12']

def daysToLoad(yyyy, mm, day_range = None):
    if day_range == None:
        days_to_load = range(1, monthrange(int(yyyy), int(mm))[1]+1)
    else:
        days_to_load = day_range
    return days_to_load


def run_etl(yyyy
            , day_range = None
            , months_to_load = months_of_year
            , uri = device_uris
            , dataset_name = dataset_name
            , federated_table_name = federated_table_name
            , federated_table_id = federated_table_id
            , staging_table_id = staging_table_id
            , destination_table_id = destination_table_id):
    
    
    print('Uploading year', yyyy)
    if yyyy < yesterday_year:
        for mm in months_to_load:
            print(" Uploading month", mm)
            days_to_load = daysToLoad(yyyy, mm, day_range = day_range)
            for day in days_to_load:
                dd = str(day).zfill(2)
                print("  Uploading day", dd)
                uris = uri.format(yyyy=yyyy, mm=mm, dd=dd)
                create_federated_table(dataset_name, federated_table_name, uris)
                
                #delete_table(table = staging_table_name)
                load_and_parse_table_to_bigquery(federated_table_id = federated_table_id
                                                 , staging_table_id = staging_table_id
                                                 , destination_table_id = destination_table_id)
                
                delete_table(table = staging_table_name)
    
    else:
        if yyyy == yesterday_year:
            
            # this section ensures that data is upload up until yesterday
            for mm in months_to_load:
                print(" Uploading month", mm)
                days_to_load = daysToLoad(yyyy, mm, day_range = day_range)                
                if int(mm) < int(yesterday_month):
                    days_to_load = days_to_load
                else:
                    if int(mm) == int(yesterday_month):
                        days_to_load = [d for d in days_to_load if d <= int(yesterday_day)]
                        
                for day in days_to_load:
                    dd = str(day).zfill(2)
                    print("  Uploading day", dd)
                    uris = uri.format(yyyy=yyyy, mm=mm, dd=dd)
                    create_federated_table(dataset_name, federated_table_name, uris)
                    
                    load_and_parse_table_to_bigquery(federated_table_id = federated_table_id
                                                 , staging_table_id = staging_table_id
                                                 , destination_table_id = destination_table_id)               
                     
                    delete_table(table = staging_table_name)
                    
    print('Done.')


--------------------------------

# Run ETL

*The default is for the ETL to run for all months of the year (up until yesterday's date) and all days of each month --  unless you specify otherwise as shown below*

**EXAMPLES**

- To upload data for the whole year 2022

`run_etl(yyyy = '2022')`


- To upload data starting from May in 2022

`run_etl(yyyy = '2022', months_to_load = [05','06','07','08','09','10','11','12'])`



- To upload data for starting from day 17 only in June in year 2022

`run_etl(yyyy = '2022', months_to_load = ['06'], day_range = range(17, 32))` **always add one to the range end day**


**Triple check the dataset names, project names etc**

In [None]:
print(main_table)
print('\ndataset_name: '+dataset_name)
print('\nfederated_table_name:'+federated_table_name)
print('\nfederated_table_id: '+federated_table_id)
print('\nstaging_table_name: '+staging_table_name)
print('\nstaging_table_id: '+staging_table_id)
print('\ndestination_table_id: '+destination_table_id)

**Check the max upload date per dat data type**

In [None]:
pd.read_gbq('''SELECT MAX(upload_date), 
               FROM `aou-res-curation-prod.fitbit_ingest.device` ''')



*need to upfate from 2023/02/17 to 2023/04/04 (from max upload date +1 day to yesterday*

In [None]:
# UPDATE on 4/5/2023

## run 1 - DONE
#run_etl(yyyy = '2023', months_to_load = ['02'], day_range = range(17, 32))

## run 2 - DONE
#run_etl(yyyy = '2023', months_to_load = ['03', '04'])

In [None]:
## DONE! 2.17.2023
#run_etl(yyyy = '2023')

## DONE! 2.17.2023
#run_etl(yyyy = '2022', months_to_load = ['06','07','08','09','10','11','12'])

## DONE! 2.17.2023
#run_etl(yyyy = '2022', months_to_load = ['05'], day_range = range(17, 32))

In [None]:
# Check max upload date again to make sure everything updated
pd.read_gbq('''SELECT MAX(upload_date), 
               FROM `aou-res-curation-prod.fitbit_ingest.device` ''')


# RUN/Update `fitbit_ingest.id_mapping`

In [None]:
import pandas as pd
import numpy as np
import mysql.connector
import sys
sys.path.insert(0, '../..')
from config import connect_options

#cloud_sql_proxy_x64 -dir=./ -instances=all-of-us-rdr-prod:us-central1:rdrbackupdb-e=tcp:3308
con = mysql.connector.connect(**connect_options)

In [None]:
query = """
     SELECT DISTINCT
        participant_id,
        CAST(external_id AS UNSIGNED) as vibrent_id
    FROM rdr.participant
    WHERE external_id IS NOT NULL
"""

rdr_pids = pd.read_sql_query(query, con=con)

In [None]:
project_id = "aou-res-curation-prod"
destination_table="fitbit_ingest.id_mapping"

rdr_pids.to_gbq(destination_table, project_id, if_exists="replace")

# Device Data Update History

***- Aymone updated on 2.17.2023: from beginning of data (05/17/2022 to 2/16/2023)***

    - year 2020 and 2021 - no device data

    - year 2022: from 05/17/2022 to 12/31/2022

    - Year 2023: from 01/01 to 2/16/2023

***- Aymone updated on 4.05.2023: from 2023/02/17 to 2023/04/04***


# QC ETL

 **See notebook**

# Creating View `v_device` - RUN ONLY WHEN ALL IS DONE

In [None]:
def create_view(view_query, destination_view_name, dataset_name = dataset_name, project_id = cur_project_id):

    client = bigquery.Client(project_id)

    query_job = client.query(view_query.format(project_id = project_id
                                               , dataset_name = dataset_name
                                               , destination_view_name = destination_view_name))  # Make an API request.
    query_job.result()  # Wait for the job to complete.    print(f"Created {view.table_type}: {str(view.reference)}")

In [None]:
v_device_query = '''
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.{destination_view_name}`
    
    AS (SELECT * EXCEPT (rn, upload_date)
        FROM (SELECT 
                m.participant_id AS person_id, 
                d.* EXCEPT (vibrent_id),
                ROW_NUMBER() OVER(PARTITION BY vibrent_id, device_id, date
                , battery , device_version, device_type, last_sync_time
                ORDER BY upload_date DESC) AS rn
             FROM `aou-res-curation-prod.fitbit_ingest.device` d
             JOIN `aou-res-curation-prod.fitbit_ingest.id_mapping` m USING(vibrent_id)
            )
        WHERE rn = 1
        )
    '''

In [None]:
create_view(v_device_query, 'v_device')

------------

In [None]:
## copy a table from apple_healthkit_ingest to test_apple_healthkit_ingest
def copy_table(origin_table_name, destination_table_name, disposition = 'WRITE_TRUNCATE'
                           , dataset_name = dataset_name, project_id = cur_project_id):

    
    query = '''SELECT * FROM `{project_id}.test_fitbit_ingest.{origin_table_name}`'''
    
    client = bigquery.Client(project_id)
    job_config = bigquery.QueryJobConfig(
                        allow_large_results=True
                        , destination=f'{project_id}.{dataset_name}.{destination_table_name}'
                        , write_disposition=disposition
                        #, use_legacy_sql=True
    )
        
    query_job = client.query(query.format(project_id= project_id, origin_table_name = origin_table_name)
                             , job_config=job_config)  # Make an API request.
    query_job.result()  # Wait for the job to complete.
    
copy_table(origin_table_name = 'device', destination_table_name = 'device')

In [None]:
# RECYCLED
# months_of_year = ['01','02','03','04','05','06','07','08','09','10','11','12']

# def daysToLoad(yyyy, mm, day_range = None):
#     if day_range == None:
#         days_to_load = range(1, monthrange(int(yyyy), int(mm))[1]+1)
#     else:
#         days_to_load = day_range
#     return days_to_load


# def run_etl(yyyy
#             , day_range = None
#             , months_to_load = months_of_year
#             , uri = device_uris
#             , dataset_name = dataset_name
#             , federated_table_name = federated_table_name
#             , federated_table_id = federated_table_id
#             , staging_table_id = staging_table_id
#             , destination_table_id = destination_table_id):
    
    
#     print('Uploading year', yyyy)
#     if yyyy < yesterday_year:
#         for mm in months_to_load:
#             print(" Uploading month", mm)
#             days_to_load = daysToLoad(yyyy, mm, day_range = day_range)
#             for day in days_to_load:
#                 dd = str(day).zfill(2)
#                 print("  Uploading day", dd)
#                 uris = uri.format(yyyy=yyyy, mm=mm, dd=dd)
#                 create_federated_table(dataset_name, federated_table_name, uris)
                
#                 #delete_table(table = staging_table_name)
#                 load_and_parse_table_to_bigquery(federated_table_id = federated_table_id
#                                                  , staging_table_id = staging_table_id
#                                                  , destination_table_id = destination_table_id)
                
#                 delete_table(table = staging_table_name)
    
#     else:
#         if yyyy == yesterday_year:
#             # this section ensures that data is upload up until yesterday
#             for mm in [m for m in months_to_load if int(m) <= int(yesterday_month)]:
#                 print(" Uploading month", mm)
#                 days_to_load = daysToLoad(yyyy, mm, day_range = day_range)
#                 days_to_load = [d for d in days_to_load if d <= int(yesterday_day)]
#                 for day in days_to_load:
#                     dd = str(day).zfill(2)
#                     if dd <= "00":
#                         continue
#                     if dd > yesterday_day:
#                         break
#                     print("  Uploading day", dd)
#                     uris = uri.format(yyyy=yyyy, mm=mm, dd=dd)
#                     create_federated_table(dataset_name, federated_table_name, uris)
                    
#                     load_and_parse_table_to_bigquery(federated_table_id = federated_table_id
#                                                  , staging_table_id = staging_table_id
#                                                  , destination_table_id = destination_table_id)               
                     
#                     delete_table(table = staging_table_name)
