In [4]:
# ! pip install google-auth google-cloud-bigquery --quiet

In [37]:
# ! pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'

In [38]:
from datetime import datetime, timedelta, date
import json

from google.oauth2 import service_account
from google.cloud import bigquery

In [75]:
def getClient(path='credentials.json'):
    return bigquery.Client(credentials=service_account.Credentials.from_service_account_file(path))

def queryGbq(client, query):
    query_job = client.query(query)
    return query_job.result().to_dataframe(create_bqstorage_client=False)

def lambda_handler(event, context):
    """Sample pure Lambda function
    """

    try:
        client = getClient(path='credentials.json')
        path = event["path"]
        param = event["queryStringParameters"]
        query = build_query_by_parsing(path, param)
        results = queryGbq(client, query)
        if True:
            formatted_results = results.to_json(orient='records')

    except Exception as e:
        # Send some context about this error to Lambda Logs
        print(e)

    # print(results.head())
    return {
        "statusCode": 200,
        "body": json.dumps({
            "data": formatted_results
        }),
    }


In [67]:
event

{'resource': '/{proxy+}',
 'path': '/vaccination/county/cumulative',
 'httpMethod': 'GET',
 'queryStringParameters': {'source': 'cdc',
  'from': '2021-01-01',
  'geoid': '(1131,204031,206063,206091)',
  'to': '2021-01-15'}}

In [86]:
event = {
    "resource": "/{proxy+}",
    "path": "/deaths/county/cumulative",
    "httpMethod": "GET",
    "queryStringParameters": {
        "source": "nyt",
        "from": "2021-01-01",
        "geoid": "(1131,204031,206063,206091)",
        "to": "2021-02-01"
    }
}

In [87]:
lambda_handler(event, context)

{'statusCode': 200,
 'body': '{"data": "[{\\"fips_code\\":1131,\\"_2021_01_01\\":19.0,\\"_2021_01_02\\":19.0,\\"_2021_01_03\\":19.0,\\"_2021_01_04\\":19.0,\\"_2021_01_05\\":19.0,\\"_2021_01_06\\":19.0,\\"_2021_01_07\\":20.0,\\"_2021_01_08\\":20.0,\\"_2021_01_09\\":20.0,\\"_2021_01_10\\":20.0,\\"_2021_01_11\\":20.0,\\"_2021_01_12\\":20.0,\\"_2021_01_13\\":21.0,\\"_2021_01_14\\":21.0,\\"_2021_01_15\\":21.0,\\"_2021_01_16\\":21.0,\\"_2021_01_17\\":21.0,\\"_2021_01_18\\":21.0,\\"_2021_01_19\\":21.0,\\"_2021_01_20\\":21.0,\\"_2021_01_21\\":22.0,\\"_2021_01_22\\":22.0,\\"_2021_01_23\\":22.0,\\"_2021_01_24\\":22.0,\\"_2021_01_25\\":22.0,\\"_2021_01_26\\":22.0,\\"_2021_01_27\\":22.0,\\"_2021_01_28\\":22.0,\\"_2021_01_29\\":22.0,\\"_2021_01_30\\":22.0,\\"_2021_01_31\\":22.0}]"}'}

# Lambda Function test

In [11]:
# NOTE: This is a test lambda_handler to find out the parameters and path
import json

def lambda_handler(event, context):
    statusCode = 200
    return {
        "statusCode": statusCode,
        "body": {"path": event["path"],
                 "queryStringParameters": event["queryStringParameters"]},
        "headers": {
            "Content-Type": "application/json"
        }
    }

Test URL:

vaccination/county/cumulative?source=usafacts&from=2021-01-01&to=2021-01-15&geoid=(1131,204031,206063,206091)

In [61]:
event = {
    "resource": "/{proxy+}",
    "path": "/vaccination/county/cumulative",
    "httpMethod": "GET",
    "queryStringParameters": {
        "source": "cdc",
        "from": "2021-01-01",
        "geoid": "(1131,204031,206063,206091)",
        "to": "2021-01-15"
    }
}
context = None

path = event["path"]
param = event["queryStringParameters"]

In [82]:
json.dumps("1erdfd")

'"1erdfd"'

In [62]:
build_query_by_parsing(path, param)

In [63]:
build_query_by_parsing(path, param)

'SELECT fips_code, _2021_01_01, _2021_01_02, _2021_01_03, _2021_01_04, _2021_01_05, _2021_01_06, _2021_01_07, _2021_01_08, _2021_01_09, _2021_01_10, _2021_01_11, _2021_01_12, _2021_01_13, _2021_01_14 FROM vaccination_fully_vaccinated_cdc WHERE fips_code in (1131,204031,206063,206091)'

In [74]:
def build_query_by_parsing(path, param):
    '''
    Test example:
    path is required, param is optional
    path = '/vaccination/county/cumulative',
    param = {'source': 'cdc',
           'from': '2020-05-01',
           'geoid': '(1131,204031,206063,206091)',
           'to': '2020-11-01'}
    '''
    path_lst = path.split('/')
    dataset, spatial_unit, temporal_unit = [i for i in path_lst[1:]]
    
    select_clause = create_select_clause(temporal_unit, param)
    
    # default data source is cdc
    if "source" in param:
        from_clause = create_from_clause(path, param["source"])
    else:
        from_clause = create_from_clause(path, "cdc") 
    
    query = select_clause  + from_clause
    
    if "geoid" in param:
        where_clause = f" WHERE fips_code in {param['geoid']}"
        query += where_clause

    return query

In [83]:
def create_from_clause(path, source):
    '''
    source: datasource
    
    '''
    path_lst = path.split('/')
    dataset, spatial_unit, temporal_unit = [i for i in path_lst[1:]]
    
    from_clause = " FROM covid-atlas.public."
    
    if dataset == "vaccination":
        from_clause += ("vaccination_fully_vaccinated" + f"_{source}")
    elif dataset == "confirmed":
        from_clause += ("covid_confirmed" + f"_{source}")
    elif dataset == "deaths":
        from_clause += ("covid_deaths" + f"_{source}")
    
    if spatial_unit == "state":
        from_clause += "_state"
        
    return from_clause

In [47]:
def create_select_clause(temporal_unit, param):
    '''
    Input:
    temporal_unit(str): cumulative, time-series, snapshot
    param(dic): queryStringParameters
    Rules:
    if temporal_unit is snapshot, then 'recentX' (recent x days) in param
    if "from" in param, "to" is in param
    if temporal_unit is snapshot, it's stronger than "from" and "to"
    '''
    
    select_clause = "SELECT fips_code"
    
    if temporal_unit == "snapshot":
        end = date.today()
        start = end - timedelta(days=param['recentX'])
        date_list = [(start + timedelta(days=x)).strftime("_%Y_%m_%d") for x in range((end - start).days)]
        columns = (', ').join(date_list)
        select_clause += f", {columns}"
        
    elif "from" in param:
        start = datetime.fromisoformat(param["from"])
        end = datetime.fromisoformat(param["to"])
        date_list = [(start + timedelta(days=x)).strftime("_%Y_%m_%d") for x in range((end - start).days)]
        columns = (', ').join(date_list)
        select_clause += f", {columns}"
        
    return select_clause

In [32]:
create_from_clause('/vaccination/county/cumulative', "cdc")

'FROM vaccination_fully_vaccinated_cdc'

In [30]:
create_from_clause("/confirmed/state/cumulative", "usafact")

'covid_confirmed_usafact_state'

In [50]:
temporal_unit = "snapshot"
param = {'source': 'cdc',
   'from': '2020-05-01',
   'geoid': '(1131,204031,206063,206091)',
   'to': '2020-05-15',
    'recentX': 30}
create_select_clause(temporal_unit, param)

'SELECT fips_code, _2021_11_03, _2021_11_04, _2021_11_05, _2021_11_06, _2021_11_07, _2021_11_08, _2021_11_09, _2021_11_10, _2021_11_11, _2021_11_12, _2021_11_13, _2021_11_14, _2021_11_15, _2021_11_16, _2021_11_17, _2021_11_18, _2021_11_19, _2021_11_20, _2021_11_21, _2021_11_22, _2021_11_23, _2021_11_24, _2021_11_25, _2021_11_26, _2021_11_27, _2021_11_28, _2021_11_29, _2021_11_30, _2021_12_01, _2021_12_02'

In [51]:
temporal_unit = "cumulative"
param = {'source': 'cdc',
   'from': '2020-05-01',
   'geoid': '(1131,204031,206063,206091)',
   'to': '2020-05-15'}
create_select_clause(temporal_unit, param)

'SELECT fips_code, _2020_05_01, _2020_05_02, _2020_05_03, _2020_05_04, _2020_05_05, _2020_05_06, _2020_05_07, _2020_05_08, _2020_05_09, _2020_05_10, _2020_05_11, _2020_05_12, _2020_05_13, _2020_05_14'