In [1]:
import json
import requests
from requests.adapters import HTTPAdapter, Retry
import time
from time import sleep
import pandas as pd
from loguru import logger

API_VERSION = '4.0'
API_PORT = '19999'

# Looker PDT - Set LOOKER_MODEL_NAME and LOOKER_VIEW_NAME
LOOKER_MODEL_NAME = 'windriversystems_v2'
LOOKER_VIEW_NAME = 'all_plan_categories'

def model(dbt, session):
    # Get environment variable: looker_credentials (json)
    looker_credentials_str = dbt.get('config', {}).get("looker_credentials")
    looker_credentials = json.loads(looker_credentials_str)
    api_url = looker_credentials.get("api_url")
    client_id = looker_credentials.get("client_id")
    client_secret = looker_credentials.get("client_secret")

    base_url = '{}:{}/api/{}'.format(api_url, API_PORT, API_VERSION)

    # Create looker_session with retries for error codes: 409, 429, 500
    looker_session = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[ 409, 429, 500 ])
    looker_session.mount('https://looker.com', HTTPAdapter(max_retries=retries))

    # Login to Looker API and get access_token
    login_url = '{}/login'.format(base_url)

    login_headers = {
        'Accept': 'application/json'
    }

    login_payload = {
        'client_id': '{}'.format(client_id),
        'client_secret': '{}'.format(client_secret)
    }

    login_response = looker_session.post(
        login_url,
        data=login_payload,
        headers=login_headers
    )

    login_response_json = login_response.json()
    access_token = login_response_json.get('access_token')

    # Start a Looker PDT and get materialization_id
    start_pdt_url = '{}/derived_table/{}/{}/start'.format(
        base_url,
        LOOKER_MODEL_NAME,
        LOOKER_VIEW_NAME
    )

    auth_headers = {
        'Authorization': 'Bearer {}'.format(access_token)
    }

    start_pdt_response = looker_session.get(
        start_pdt_url,
        headers=auth_headers
    )

    start_pdt_response_json = start_pdt_response.json()
    materialization_id = start_pdt_response_json.get('materialization_id')

    # Check PDT Status and loop until complete
    check_pdt_url = '{}/derived_table/{}/status'.format(
        base_url,
        materialization_id
    )

    pdt_status = 'started'
    keep_looping = True
    counter = 1
    while keep_looping:
        logger.info('Check {}: {}'.format(counter, pdt_status))
        
        check_pdt_response = looker_session.get(
            check_pdt_url,
            headers=auth_headers
        )
        
        check_pdt_response_json = json.loads(check_pdt_response.json().get('resp_text'))

        pdt_status = check_pdt_response_json.get('status')

        if pdt_status in ('complete', 'error') or counter > 5:
            keep_looping = False
        else:
            sleep(5)
        counter = counter + 1

    logger.info('Check {}: {}'.format(counter, pdt_status))
    logger.info('PDT Response JSON: {}'.format(check_pdt_response_json))

    # Convert response json to dataframe and return
    response_df = pd.DataFrame([check_pdt_response_json])

    return response_df



In [10]:
from snowflake.snowpark import Session

# Get secrets from file
secrets_file = open('snowflake_secrets.json',)
secrets = json.load(secrets_file)

snowflake_account = secrets.get('snowflake_account')
snowflake_user = secrets.get('snowflake_user')
snowflake_password = secrets.get('snowflake_password')
snowflake_role = secrets.get('snowflake_role')
snowflake_database = secrets.get('snowflake_database')
snowflake_warehouse = secrets.get('snowflake_warehouse')

In [11]:
print('snowflake_user = {}'.format(snowflake_user))
print('snowflake_warehouse = {}'.format(snowflake_warehouse))
print('snowflake_database = {}'.format(snowflake_database))

snowflake_user = JHUTH
snowflake_warehouse = COMPUTE_WH
snowflake_database = HACKATHON


In [12]:
# Connect to Snowflake
connection_parameters = {
    'account': snowflake_account,
    'user': snowflake_user,
    'password': snowflake_password,
    'role': snowflake_role,
    'warehouse': snowflake_warehouse,
    'database': snowflake_database,
    'schema': 'DBT_JHUTH'
}

new_session = Session.builder.configs(connection_parameters).create()

In [13]:
new_session.close()

In [6]:
class dbtObj:
    def __init__(self, config) -> None:
        self.config = config

In [7]:
session = requests.Session()
dbt = dbtObj(
    config={
        'looker_credentials': '{ "api_url": "https://windriversystemsdev.looker.com", "client_id": "3JGNkD3ntJ5vfGJxPzhb", "client_secret": "p3qnKMqt9PmdyZCb79SdZS8f" }'
    }
)


In [9]:
response_df = model(dbt,session)

2022-09-21 11:19:18.143 | INFO     | __main__:model:81 - Check 1: started
2022-09-21 11:19:23.493 | INFO     | __main__:model:81 - Check 2: running
2022-09-21 11:19:28.851 | INFO     | __main__:model:81 - Check 3: running
2022-09-21 11:19:29.230 | INFO     | __main__:model:98 - Check 4: complete
2022-09-21 11:19:29.231 | INFO     | __main__:model:99 - PDT Response JSON: {'status': 'complete', 'runtime': 4.6513450145721436, 'message': 'API materializing all_plan_categories and component(s) wc_all_acv_plan,wc_all_revenue_plan,wc_all_invoice_plan,wc_all_nb_plan,mammoth_transposed,normalized_booking_transposed', 'query_slug': 'b9283f96795e0dfb1a92e28ec78d92af'}
