In [None]:
%pip install snowflake-snowpark-python
%pip install boto3

In [1]:
import json
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import sproc
import snowflake.snowpark
from snowflake.snowpark.types import IntegerType
import boto3
import logging
import sys
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.types import IntegerType

In [2]:
# Load snowflake secrets and. create the session
# Snowflake Oauth integration test account
with open("./secrets/GZ45853.json", "r") as f:
    connection_parameters = json.loads(f.read())
test_session = Session.builder.configs(connection_parameters).create()
test_session

<snowflake.snowpark.session.Session at 0x11f3153a0>

In [7]:
bucket = "airbyte.alex"
connector = "source-pokeapi"
aws_role_arn = "arn:aws:iam::168714685353:role/snowflake-api-gateway-test"
api_gateway_url = "https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage"
api_integration_name = f"{connector}_api_integration".replace("-", "_")

In [8]:
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

In [9]:
# Create a stage to store the code
create_stage_result = test_session.sql(f"create or REPLACE stage mystage url = 's3://{bucket}'").collect()

INFO:snowflake.connector.cursor:query: [create or REPLACE stage mystage url = 's3://airbyte.alex']
INFO:snowflake.connector.cursor:query execution done


In [10]:
# Create the integration
# WARNING: The gateway's trust relationship must be updated if the api integration is recreated.
def create_api_integration(api_integration_name, aws_role_arn, api_gateway_url):
    return test_session.sql(f"""
    create or replace api integration {api_integration_name}
      api_provider = aws_api_gateway
      api_aws_role_arn = '{aws_role_arn}'
      api_allowed_prefixes = ('{api_gateway_url}')
      enabled = true;
    """).collect()
def describe_api_integration(api_integration_name):
    return test_session.sql(f"describe integration {api_integration_name}").collect()


#create_api_integration(api_integration_name, aws_role_arn, api_gateway_url)
describe_api_integration(api_integration_name)

INFO:snowflake.connector.cursor:query: [describe integration source_pokeapi_api_integration]
INFO:snowflake.connector.cursor:query execution done


[Row(property='ENABLED', property_type='Boolean', property_value='true', property_default='false'),
 Row(property='API_KEY', property_type='String', property_value='', property_default=''),
 Row(property='API_PROVIDER', property_type='String', property_value='AWS_API_GATEWAY', property_default=''),
 Row(property='API_AWS_IAM_USER_ARN', property_type='String', property_value='arn:aws:iam::147018273998:user/6o3f-s-ohss5467', property_default=''),
 Row(property='API_AWS_ROLE_ARN', property_type='String', property_value='arn:aws:iam::168714685353:role/snowflake-api-gateway-test', property_default=''),
 Row(property='API_AWS_EXTERNAL_ID', property_type='String', property_value='GZ45853_SFCRole=2_Q9B1fFjAUub69Ifq6flHOJwBZ/8=', property_default=''),
 Row(property='API_ALLOWED_PREFIXES', property_type='List', property_value='https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage', property_default='[]'),
 Row(property='API_BLOCKED_PREFIXES', property_type='List', property_value='', prope

In [44]:
# Create external function and translators

request_translator_name = "source_pokeapi_app.app_schema.source_request_translator"
response_translator_name = "source_pokeapi_app.app_schema.source_response_translator"
external_function_name = f"source_pokeapi_app.app_schema.{connector.replace('-', '_')}_external_function"

# Create request_translator
# The same request translator can be used across multiple connectors
def create_request_translator(request_translator_name):
    return test_session.sql(f"""
    create or replace secure function {request_translator_name}(event object)
    returns object
    language javascript as
    '
    body = EVENT.body.data[0][1]
    suffixUrl = EVENT.body.data[0][2]
    return {{ "body": body, "urlSuffix": suffixUrl}};
    ';
    """).collect()

  
# Create response translator
# The same response translator can be used across multiple connectors
def create_response_translator(response_translator_name):
    return test_session.sql(f"""
    create or replace secure function {response_translator_name}(event object)
    returns object
    language javascript as
    '
    return {{ "body": {{ "data" : [[0, EVENT]] }}}};
    ';
    """).collect()

# Create external function
# One external function per connector
def create_external_function(external_function_name,
                             api_integration_name,
                             request_translator_name,
                             response_translator_name,
                             api_gateway_url):
    query = f"""
    create or replace secure external function {external_function_name}(body varchar, urlSuffix varchar)
      returns variant
      api_integration = {api_integration_name}
      request_translator = {request_translator_name}
      response_translator = {response_translator_name}
      as '{api_gateway_url}';
    """
    return test_session.sql(query).collect()

def describe_external_function(external_function_name):
    query = f"describe function {external_function_name} (varchar, varchar)"
    return test_session.sql(query).collect()

print(create_request_translator(request_translator_name))
print(create_response_translator(response_translator_name))
print(create_external_function(external_function_name, api_integration_name, request_translator_name, response_translator_name, api_gateway_url))

describe_external_function(external_function_name)

INFO:snowflake.connector.cursor:query: [create or replace secure function source_pokeapi_app.app_schema.source_request_t...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_REQUEST_TRANSLATOR successfully created.')]
INFO:snowflake.connector.cursor:query: [create or replace secure function source_pokeapi_app.app_schema.source_response_...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_RESPONSE_TRANSLATOR successfully created.')]
INFO:snowflake.connector.cursor:query: [create or replace secure external function source_pokeapi_app.app_schema.source_...]
INFO:snowflake.connector.cursor:query execution done
[Row(status='Function SOURCE_POKEAPI_EXTERNAL_FUNCTION successfully created.')]
INFO:snowflake.connector.cursor:query: [describe function source_pokeapi_app.app_schema.source_pokeapi_external_function...]
INFO:snowflake.connector.cursor:query execution done


[Row(property='signature', value='(BODY VARCHAR, URLSUFFIX VARCHAR)'),
 Row(property='returns', value='VARIANT'),
 Row(property='language', value='EXTERNAL'),
 Row(property='null handling', value='CALLED ON NULL INPUT'),
 Row(property='volatility', value='VOLATILE'),
 Row(property='body', value='https://w72cfwmned.execute-api.us-west-1.amazonaws.com/stage'),
 Row(property='headers', value='null'),
 Row(property='context_headers', value='null'),
 Row(property='max_batch_rows', value='not set'),
 Row(property='request_translator', value='SOURCE_POKEAPI_APP.APP_SCHEMA.SOURCE_REQUEST_TRANSLATOR'),
 Row(property='response_translator', value='SOURCE_POKEAPI_APP.APP_SCHEMA.SOURCE_RESPONSE_TRANSLATOR'),
 Row(property='compression', value='AUTO')]

In [39]:
# Create a database to be used by the application
def create_database(source_name):
    database_query = f"""
    CREATE DATABASE if not exists {source_name}_app;
    """
    schema_query = f"""
    CREATE SCHEMA if not exists {source_name}_app.app_schema;
    """
    test_session.sql(database_query).collect()
    test_session.sql(schema_query).collect()
create_database("source_pokeapi")
test_session.sql("use database source_pokeapi_app;").collect()
test_session.sql("use schema app_schema;").collect()

INFO:snowflake.connector.cursor:query: [CREATE DATABASE if not exists source_pokeapi_app;]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE SCHEMA if not exists source_pokeapi_app.app_schema;]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [use database source_pokeapi_app;]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [use schema app_schema;]
INFO:snowflake.connector.cursor:query execution done


[Row(status='Statement executed successfully.')]

In [45]:
def list_files_to_load(connector):
    s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')

    def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
        prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
        start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
        for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
            for content in page.get('Contents', ()):
                key = content['Key']
                yield key

    keys = list(keys(bucket, prefix=f"{connector}"))

    # Compute the list of files to import in the proc
    # For now we're just loading everything
    files_to_load = [f"@mystage/{k}" for k in keys if "pendulum" not in k]
    return files_to_load

In [46]:
# Create a stage to store the code
create_stage_result = test_session.sql(f"create or REPLACE stage source_pokeapi_app.app_schema.mystage url = 's3://{bucket}'").collect()

INFO:snowflake.connector.cursor:query: [create or REPLACE stage source_pokeapi_app.app_schema.mystage url = 's3://airbyt...]
INFO:snowflake.connector.cursor:query execution done


In [47]:
# Internal stored procedure that will not be exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], imports=list_files_to_load(connector), name="source_pokeapi_app.app_schema.sync_connector_to_table", replace=True, is_permanent=True, stage_location="@mystage")
def compute(session: snowflake.snowpark.Session, to_table: str, config: dict) -> str:
    from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream
    from airbyte_cdk.models import SyncMode, DestinationSyncMode
    import pandas as pd
    from snowflake.snowpark.exceptions import SnowparkSQLException
    from source_pokeapi import SourcePokeapi
    logger = logging.getLogger("logger")
    source = SourcePokeapi()

    catalog = source.discover(logger, config)
    configured_catalog = ConfiguredAirbyteCatalog(
        streams=[ConfiguredAirbyteStream(stream=s, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.append) for s
                 in catalog.streams])
    
    # hack: get the base url from the first stream...
    base_url = source.streams(config)[0].url_base
        
    def patch_send(session):
        import requests

        def convert_request_to_external_function_input(request: requests.PreparedRequest):
            body = request.body
            headers = request.headers
            url = request.url
            return {"body": body, "headers": headers, "url": url}

        def convert_external_function_output_to_response(output) -> requests.Response:
            response = requests.Response()
            response.status_code = 200
            actual_output = list(output[0].as_dict().items())[0][1]
            
            response_as_json = json.loads(actual_output)
            body = response_as_json["body"]
            
            response._content = json.dumps(body).encode("ascii")
            return response

        def new_session_send(self, request, **kwargs):
            # convert to external function arguments
            args = convert_request_to_external_function_input(request)

            # call external function
            if session:
                #FIXME: No error handling...
                path = "/" + args["url"].replace(base_url, "")
                try:
                    output_from_external_function = session.sql(f"select source_pokeapi_external_function('{args['body']}', '{path}');").collect()
                except SnowparkSQLException as e:
                    print(dir(e))
                    print(e)
                    raise e
                response = convert_external_function_output_to_response(output_from_external_function)
            else:
                # This block is just fo testing...
                content = b'{"data": "hello"}'
                response = requests.Response()
                response.status_code = 200
                response._content = content
            return response

        requests.sessions.Session.send = new_session_send
    patch_send(session)
    
    # Filter the columns to avoid running out of memory :(
    keys_to_keep = ["id", "name", "base_experience", "height", "weight"]
    for m in source.read(logger, config, configured_catalog, {}):
        data = m.record.data
        #filtered_data = dict((key, value) for key, value in data.items() if key in keys_to_keep)
        session.sql(f"insert into {to_table} select parse_json('{json.dumps(data)}')").collect()
        #session.create_dataframe([filtered_data]).write.mode('append').save_as_table(to_table)
    return str(data)

INFO:snowflake.connector.cursor:query: [ls '@mystage']
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01a7b2c9-0000-c73c-00...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [select package_name, version from information_schema.packages where language='py...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CREATE OR REPLACE PROCEDURE source_pokeapi_app.app_schema.sync_connector_to_tabl...]
INFO:snowflake.connector.cursor:query execution done


In [16]:
# reset output table
test_session.sql("create table if not exists public.test_pokeapi (data variant)").collect()
test_session.sql("delete from public.test_pokeapi;").collect()

INFO:snowflake.connector.cursor:query: [create table if not exists public.test_pokeapi (data variant)]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [delete from public.test_pokeapi;]
INFO:snowflake.connector.cursor:query execution done


[Row(number of rows deleted=4975)]

In [43]:
test_session.call("source_pokeapi_app.app_schema.sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "zubat"})

INFO:snowflake.connector.cursor:query: [CALL source_pokeapi_app.app_schema.sync_connector_to_table('public.test_pokeapi'...]
INFO:snowflake.connector.cursor:query execution done


"{'abilities': [{'ability': {'name': 'inner-focus', 'url': 'https://pokeapi.co/api/v2/ability/39/'}, 'is_hidden': False, 'slot': 1}, {'ability': {'name': 'infiltrator', 'url': 'https://pokeapi.co/api/v2/ability/151/'}, 'is_hidden': True, 'slot': 3}], 'base_experience': 49, 'forms': [{'name': 'zubat', 'url': 'https://pokeapi.co/api/v2/pokemon-form/41/'}], 'game_indices': [{'game_index': 107, 'version': {'name': 'red', 'url': 'https://pokeapi.co/api/v2/version/1/'}}, {'game_index': 107, 'version': {'name': 'blue', 'url': 'https://pokeapi.co/api/v2/version/2/'}}, {'game_index': 107, 'version': {'name': 'yellow', 'url': 'https://pokeapi.co/api/v2/version/3/'}}, {'game_index': 41, 'version': {'name': 'gold', 'url': 'https://pokeapi.co/api/v2/version/4/'}}, {'game_index': 41, 'version': {'name': 'silver', 'url': 'https://pokeapi.co/api/v2/version/5/'}}, {'game_index': 41, 'version': {'name': 'crystal', 'url': 'https://pokeapi.co/api/v2/version/6/'}}, {'game_index': 41, 'version': {'name': 'r

In [26]:
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "articuno"})
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "zapdos"})
test_session.call("sync_connector_to_table", "public.test_pokeapi", {"pokemon_name": "moltres"})

INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.cursor:query: [CALL sync_connector_to_table('public.test_pokeapi', parse_json('{"pokemon_name":...]
INFO:snowflake.connector.cursor:query execution done


"{'abilities': [{'ability': {'name': 'pressure', 'url': 'https://pokeapi.co/api/v2/ability/46/'}, 'is_hidden': False, 'slot': 1}, {'ability': {'name': 'flame-body', 'url': 'https://pokeapi.co/api/v2/ability/49/'}, 'is_hidden': True, 'slot': 3}], 'base_experience': 290, 'forms': [{'name': 'moltres', 'url': 'https://pokeapi.co/api/v2/pokemon-form/146/'}], 'game_indices': [{'game_index': 73, 'version': {'name': 'red', 'url': 'https://pokeapi.co/api/v2/version/1/'}}, {'game_index': 73, 'version': {'name': 'blue', 'url': 'https://pokeapi.co/api/v2/version/2/'}}, {'game_index': 73, 'version': {'name': 'yellow', 'url': 'https://pokeapi.co/api/v2/version/3/'}}, {'game_index': 146, 'version': {'name': 'gold', 'url': 'https://pokeapi.co/api/v2/version/4/'}}, {'game_index': 146, 'version': {'name': 'silver', 'url': 'https://pokeapi.co/api/v2/version/5/'}}, {'game_index': 146, 'version': {'name': 'crystal', 'url': 'https://pokeapi.co/api/v2/version/6/'}}, {'game_index': 146, 'version': {'name': 'r

In [None]:
test_session.sql("select * from public.test_pokeapi").collect()

In [None]:
test_session.sql("create table if not exists source_pokeapi_app.app_schema.configs (consumer_id varchar, output_table varchar, config variant);").collect()

In [None]:
# Let's insert and read a config to/from a table
def insert_or_update_config(config_table, consumer_id, output_table, config):
    test_session.sql(f"""
    merge into {config_table} a using (select '{consumer_id}' as consumer_id, '{output_table}' as output_table, parse_json('{json.dumps(config)}') as config) as b on a.CONSUMER_ID=b.CONSUMER_ID
      when matched then update set a.config=b.config
      when not matched then insert (consumer_id, output_table, config) values (b.CONSUMER_ID, b.output_table, b.config);
""").collect()
insert_or_update_config("source_pokeapi_app.app_schema.configs", 'id0', "public.test_pokeapi", {"pokemon_name": "abomasnow"})

In [None]:
test_session.sql("select * from source_pokeapi_app.app_schema.configs;").collect()

In [None]:
# Internal stored procedure that will not be exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="sync_consumer_id", replace=True, is_permanent=True, stage_location="@mystage")
def compute_consumer(session: snowflake.snowpark.Session, consumer_id: str) -> str:
    row = session.sql(f"select output_table, config from source_pokeapi_app.app_schema.configs where consumer_id = '{consumer_id}';").collect()[0]
    output_table = row["OUTPUT_TABLE"]
    config = json.loads(row["CONFIG"])
    return session.call("sync_connector_to_table", output_table, config)

In [None]:
test_session.call("sync_consumer_id", "id0")

In [None]:
test_session.sql("select * from public.test_pokeapi").collect()

In [None]:
test_session.sql("delete from source_pokeapi_app.app_schema.configs").collect()

In [None]:
# Procedure that WILL BE exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="register_config", replace=True, is_permanent=True, stage_location="@mystage")
def register_config(session: snowflake.snowpark.Session, consumer_id: str, config: dict, output_table: str) -> str:
    def insert_or_update_config(session, config_table, consumer_id, output_table, config):
        session.sql(f"""
        merge into {config_table} a using (select '{consumer_id}' as consumer_id, '{output_table}' as output_table, parse_json('{json.dumps(config)}') as config) as b on a.CONSUMER_ID=b.CONSUMER_ID
          when matched then update set a.config=b.config
          when not matched then insert (consumer_id, output_table, config) values (b.CONSUMER_ID, b.output_table, b.config);
    """).collect()
    insert_or_update_config(session, "source_pokeapi_app.app_schema.configs", consumer_id, output_table, config)
    return "REGISTER SUCCESS"

In [None]:
# Procedule that WILL BE exposed to the consumer

# pendulum has to be installed as a package for reasons...
@sproc(packages=['snowflake-snowpark-python', 'pendulum', 'pandas'], name="schedule_job", replace=True, is_permanent=True, stage_location="@mystage")
def schedule_job(session: snowflake.snowpark.Session, consumer_id: str, warehouse_name: str) -> str:
    create_task_command = f"""
    CREATE OR REPLACE TASK sync warehouse = "{warehouse_name}"
    SCHEDULE = '5 MINUTE'
    as call sync_consumer_id('{consumer_id}')
    """
    session.sql(create_task_command).collect()
    return "REGISTER SUCCESS"

In [None]:
# Creating DB roles for the Snowflake Native Application
database_role = "shared_db_role"
print(test_session.sql(f"create or replace database role {database_role}").collect())

In [None]:
test_session.sql("show roles;").collect()

In [None]:
# Grant usage
#print(test_session.sql(f"grant usage on database source_pokeapi_app to database role {database_role};").collect())
#print(test_session.sql(f"grant usage on schema source_pokeapi_app.app_schema to database role {database_role};").collect())
#print(test_session.sql(f"grant usage on procedure register_config(string, object, string) to database role {database_role}").collect())

In [None]:
# Create hidden db role
hidden_db_role = "source_pokeapi_app.hidden_db_role"
print(test_session.sql(f"create or replace database role {hidden_db_role}").collect())
print(test_session.sql(f"grant usage on database source_pokeapi_app to database role {hidden_db_role}").collect())
print(test_session.sql(f"grant usage on schema source_pokeapi_app.app_schema to database role {hidden_db_role};").collect())
print(test_session.sql(f"grant usage on procedure sync_consumer_id(string) to database role {hidden_db_role};").collect())
print(test_session.sql(f"grant usage on procedure sync_connector_to_table(string, object) to database role {hidden_db_role};").collect())

In [None]:
test_session.call("SOURCE_POKEAPI_APP.APP_SCHEMA.register_config", "id0", {"pokemon_name": "articuno"}, "public.test_pokeapi")

In [None]:
test_session.sql("describe procedure sync_consumer_id(string)").collect()

In [None]:
test_session.call("schedule_job", "id0", "COMPUTE_WH")

In [None]:
test_session.sql("describe task sync").collect()

In [None]:
test_session.sql("ALTER TASK sync resume;").collect()

In [None]:
test_session.sql("describe task sync").collect()

In [None]:
test_session.sql("select * from source_pokeapi_app.app_schema.configs").collect()

In [None]:
# Creating an installer script
# FIXME: I think the schema should be specific to the connector...
test_session.sql(f"""
CREATE OR REPLACE PROCEDURE source_pokeapi_app.app_schema.installer()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS OWNER
AS $$
  begin
    return 'installer script Done';
  end;
$$;
""").collect()

In [None]:
# Setup a share for the application
test_session.sql(f"""
CREATE SHARE IF NOT EXISTS source_pokeapi_share installer = source_pokeapi_app.app_schema.installer();
""").collect()

In [None]:
# Add objects to the share
print(test_session.sql(f"""grant usage on database source_pokeapi_app to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant usage on schema source_pokeapi_app.app_schema to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant usage on procedure source_pokeapi_app.app_schema.installer() to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant database role source_pokeapi_app.hidden_db_role to share source_pokeapi_share;""").collect())
print(test_session.sql(f"""grant database role source_pokeapi_app.shared_db_role to share source_pokeapi_share;""").collect())

In [None]:
print(test_session.sql(
    f"ALTER share source_pokeapi_share ADD ACCOUNTS = yp50190;"
).collect())

In [None]:
# Open customer session...
with open("./secrets/yp50190.json", "r") as f:
    connection_parameters = json.loads(f.read())
customer_session = Session.builder.configs(connection_parameters).create()

In [None]:
customer_session.sql("create database customer_pokeapi_db from share GZ45853.source_pokeapi_app;").collect()

In [29]:
test_session.sql("select current_schema();").collect()

INFO:snowflake.connector.cursor:query: [select current_schema();]
INFO:snowflake.connector.cursor:query execution done


[Row(CURRENT_SCHEMA()='APP_SCHEMA')]