In [None]:
"""
Snowflake Batch Prediction API Snowflake S3 scoring job

v1.0 Mike Taveirne (doyouevendata) 3/21/2020
"""

In [1]:
import pandas as pd
import requests
import time
from pandas.io.json import json_normalize
import snowflake.connector

import my_creds
#from imp import reload
#reload(my_creds)

In [2]:
# datarobot parameters
API_KEY = my_creds.API_KEY
USERNAME = my_creds.USERNAME
DEPLOYMENT_ID = my_creds.DEPLOYMENT_ID
DATAROBOT_KEY = my_creds.DATAROBOT_KEY
# replace with the load balancer for your prediction instance(s)
DR_PREDICTION_HOST = my_creds.DR_PREDICTION_HOST
DR_APP_HOST = 'https://app.datarobot.com'

DR_MODELING_HEADERS = {'Content-Type': 'application/json', 'Authorization': 'token %s' % API_KEY}

In [3]:
# snowflake parameters
SNOW_ACCOUNT = my_creds.SNOW_ACCOUNT
SNOW_USER = my_creds.SNOW_USER
SNOW_PASS = my_creds.SNOW_PASS
SNOW_DB = 'TITANIC'
SNOW_SCHEMA = 'PUBLIC'

# ETL parameters
JOB_NAME = 'pass_scoring'

### Retrieve or Create S3 Credentials

In [4]:
# get a saved credential set, return None if not found
def dr_get_catalog_credentials(name, cred_type):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    credentials_id = None

    response = requests.get(
            DR_APP_HOST + '/api/v2/credentials/',
            headers=DR_MODELING_HEADERS,
        )

    if response.status_code == 200:

        df = pd.io.json.json_normalize(response.json()['data'])[['credentialId', 'name', 'credentialType']]

        if df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].size > 0:
            credentials_id = df[(df['name'] == name) & (df['credentialType'] == cred_type)]['credentialId'].iloc[0]
     
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

    return credentials_id

In [5]:
# create credentials set
def dr_create_catalog_credentials(name, cred_type, user, password, token=None):
    if cred_type not in ['basic', 's3']:
        print('credentials type must be: basic, s3 - value passed was {ct}'.format(ct=cred_type))
        return None
    
    if cred_type == 'basic':  
        json = {
            "credentialType": cred_type,
            "user": user,
            "password": password,
            "name": name
        }
    elif cred_type == 's3' and token != None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "awsSessionToken": token,
            "name": name
        }
    elif cred_type == 's3' and token == None:  
        json = {
            "credentialType": cred_type,
            "awsAccessKeyId": user,
            "awsSecretAccessKey": password,
            "name": name
        }
        
    response = requests.post(
        url = DR_APP_HOST + '/api/v2/credentials/',
        headers=DR_MODELING_HEADERS,
        json=json
    )
    
    if response.status_code == 201:

        return response.json()['credentialId']
        
    else:

        print('Request failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))


In [6]:
# get or create a credential set
def dr_get_or_create_catalog_credentials(name, cred_type, user, password, token=None):
    cred_id = dr_get_catalog_credentials(name, cred_type)
    
    if cred_id == None:
        return dr_create_catalog_credentials(name, cred_type, user, password, token=None)
    else:
        return cred_id

In [7]:
credentials_id = dr_get_or_create_catalog_credentials('s3_community', 
                                                      's3', my_creds.SNOW_USER, my_creds.SNOW_PASS)

### Extract Data to S3 via Snowflake

In [9]:
# create a connection
ctx = snowflake.connector.connect(
    user=SNOW_USER,
    password=SNOW_PASS,
    account=SNOW_ACCOUNT,
    database=SNOW_DB,
    schema=SNOW_SCHEMA,
    protocol='https'
)

# create a cursor
cur = ctx.cursor()

# execute sql to get start/end timestamps to use
sql = "select last_ts_scored_through, current_timestamp::TIMESTAMP_NTZ cur_ts " \
    "from etl_history " \
    "where job_nm = '{job}' " \
    "order by last_ts_scored_through desc " \
    "limit 1 ".format(job=JOB_NAME)
cur.execute(sql)

# fetch results into dataframe
df = cur.fetch_pandas_all()
start_ts = df['LAST_TS_SCORED_THROUGH'][0]
end_ts = df['CUR_TS'][0]

# execute sql to dump data into a single file in S3 stage bucket
# AWS single file snowflake limit 5 GB
sql = "COPY INTO @S3_SUPPORT/titanic/community/" + JOB_NAME + ".csv " \
    "from  " \
    "( " \
    "  select passengerid, pclass, name, sex, age, sibsp, parch, ticket, fare, cabin, embarked " \
    "  from passengers_500k_ts " \
    "  where nvl(updt_ts, crt_ts) >= '{start}' " \
    "  and nvl(updt_ts, crt_ts) < '{end}' " \
    ") " \
    "file_format = (format_name='default_csv' compression='none') header=true overwrite=true single=true;".format(start=start_ts, end=end_ts)
cur.execute(sql)

<snowflake.connector.cursor.SnowflakeCursor at 0x11c600d50>

### Create DataRobot Session and Running Batch Prediction API Job

In [10]:
session = requests.Session()
session.headers = {
    'Authorization': 'Bearer {}'.format(API_KEY)
}

In [11]:
INPUT_FILE = 's3://'+ my_creds.S3_BUCKET + '/titanic/community/' + JOB_NAME + '.csv'
OUTPUT_FILE = 's3://'+ my_creds.S3_BUCKET + '/titanic/community/' + JOB_NAME + '_scored.csv'

job_details = {
    'deploymentId': DEPLOYMENT_ID,
    'passthroughColumns': ['PASSENGERID'],
    'numConcurrent': 4,
    "predictionInstance" : {
        "hostName": DR_PREDICTION_HOST,
        "datarobotKey": DATAROBOT_KEY
    },
    'intakeSettings': {
        'type': 's3',
        'url': INPUT_FILE,
        'credentialId': credentials_id
    },
    'outputSettings': {
        'type': 's3',
        'url': OUTPUT_FILE,
        'credentialId': credentials_id
    }
}

In [12]:
response = session.post(
        DR_APP_HOST + '/api/v2/batchPredictions',
        json=job_details
    )

### Monitor S3 Scoring Status and Return Control Upon Completion

In [13]:
if response.status_code == 202:
    
    job = response.json()
    print('queued batch job: {}'.format(job['links']['self']))

    while job['status'] == 'INITIALIZING':
        time.sleep(3)
        response = session.get(job['links']['self'])
        response.raise_for_status()
        job = response.json()
        
    print('completed INITIALIZING')
        
    if job['status'] == 'RUNNING':

        while job['status'] == 'RUNNING':
            time.sleep(3)
            response = session.get(job['links']['self'])
            response.raise_for_status()
            job = response.json()
            
    print('completed RUNNING')
    print('status is now {status}'.format(status=job['status']))
    
    if job['status'] != 'COMPLETED':
        for i in job['logs']:
            print(i)
    
else:
    
    print('Job submission failed; http error {code}: {content}'.format(code=response.status_code, content=response.content))

queued batch job: https://app.datarobot.com/api/v2/batchPredictions/1234567891234567893/
completed INITIALIZING
completed RUNNING
status is now COMPLETED


### Truncate and Reload STG Staging Table with Results

In [14]:
# multi-statement executions
# https://docs.snowflake.com/en/user-guide/python-connector-api.html#execute_string

# truncate and load STG schema table with scored results
sql = "truncate titanic.stg.PASSENGERS_SCORED_BATCH_API; " \
    " copy into titanic.stg.PASSENGERS_SCORED_BATCH_API from @S3_SUPPORT/titanic/community/" + JOB_NAME + "_scored.csv" \
    " FILE_FORMAT = 'DEFAULT_CSV' ON_ERROR = 'ABORT_STATEMENT' PURGE = FALSE;"
ctx.execute_string(sql)

[<snowflake.connector.cursor.SnowflakeCursor at 0x12012c4d0>,
 <snowflake.connector.cursor.SnowflakeCursor at 0x11fff9b50>]

### Update Presentation Target Table With Results

In [15]:
# update target presentation table and ETL history table in transaction

sql = \
    "begin; " \
    "update titanic.public.passengers_500k_ts trg " \
    "set trg.survival = src.survived_1_prediction " \
    "from titanic.stg.PASSENGERS_SCORED_BATCH_API src " \
    "where src.passengerid = trg.passengerid; " \
    "insert into etl_history values ('{job}', '{run_through_ts}'); " \
    "commit; ".format(job=JOB_NAME, run_through_ts=end_ts)
ctx.execute_string(sql)

[<snowflake.connector.cursor.SnowflakeCursor at 0x120133b50>,
 <snowflake.connector.cursor.SnowflakeCursor at 0x120004710>,
 <snowflake.connector.cursor.SnowflakeCursor at 0x11ff5d590>,
 <snowflake.connector.cursor.SnowflakeCursor at 0x12012c550>]