In [None]:
#------------------------------------------------------------------------------
# Hands-On Lab: Data Engineering with Snowpark
# Script:       02_load_raw.py
# Author:       Jeremiah Hansen, Caleb Baechtold
# Last Updated: 1/9/2023
#------------------------------------------------------------------------------

import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
#import snowflake.snowpark.functions as F


POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
CUSTOMER_TABLES = ['customer_loyalty']
TABLE_DICT = {
    "pos": {"schema": "RAW_POS", "tables": POS_TABLES},
    "customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES}
}

# SNOWFLAKE ADVANTAGE: Schema detection
# SNOWFLAKE ADVANTAGE: Data ingestion with COPY
# SNOWFLAKE ADVANTAGE: Snowflake Tables (not file-based)

def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
    session.use_schema(schema)
    if year is None:
        location = "@external.frostbyte_raw_stage/{}/{}".format(s3dir, tname)
    else:
        print('\tLoading year {}'.format(year)) 
        location = "@external.frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year)
    
    # we can infer schema using the parquet read option
    df = session.read.option("compression", "snappy") \
                            .parquet(location)
    df.copy_into_table("{}".format(tname))
    comment_text = '''{"origin":"sf_sit-is","name":"snowpark_101_de","version":{"major":1, "minor":0},"attributes":{"is_quickstart":1, "source":"sql"}}'''
    sql_command = f"""COMMENT ON TABLE {tname} IS '{comment_text}';"""
    session.sql(sql_command).collect()

# SNOWFLAKE ADVANTAGE: Warehouse elasticity (dynamic scaling)

def load_all_raw_tables(session):
    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

    for s3dir, data in TABLE_DICT.items():
        tnames = data['tables']
        schema = data['schema']
        for tname in tnames:
            print("Loading {}".format(tname))
            # Only load the first 3 years of data for the order tables at this point
            # We will load the 2022 data later in the lab
            if tname in ['order_header', 'order_detail']:
                for year in ['2019', '2020', '2021']:
                    load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
            else:
                load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)

    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()

def validate_raw_tables(session):
    # check column names from the inferred schema
    for tname in POS_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_POS.{}'.format(tname)).columns))

    for tname in CUSTOMER_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_CUSTOMER.{}'.format(tname)).columns))


# For local debugging
if __name__ == "__main__":
    # Create a local Snowpark session
    with Session.builder.getOrCreate() as session:
        load_all_raw_tables(session)
#        validate_raw_tables(session)

In [None]:
SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100;

In [None]:
#------------------------------------------------------------------------------
# Hands-On Lab: Data Engineering with Snowpark
# Script:       04_create_order_view.py
# Author:       Jeremiah Hansen, Caleb Baechtold
# Last Updated: 1/9/2023
#------------------------------------------------------------------------------

# SNOWFLAKE ADVANTAGE: Snowpark DataFrame API
# SNOWFLAKE ADVANTAGE: Streams for incremental processing (CDC)
# SNOWFLAKE ADVANTAGE: Streams on views


from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def create_pos_view(session):
    session.use_schema('HARMONIZED')
    order_detail = session.table("RAW_POS.ORDER_DETAIL").select(F.col("ORDER_DETAIL_ID"), \
                                                                F.col("LINE_NUMBER"), \
                                                                F.col("MENU_ITEM_ID"), \
                                                                F.col("QUANTITY"), \
                                                                F.col("UNIT_PRICE"), \
                                                                F.col("PRICE"), \
                                                                F.col("ORDER_ID"))
    order_header = session.table("RAW_POS.ORDER_HEADER").select(F.col("ORDER_ID"), \
                                                                F.col("TRUCK_ID"), \
                                                                F.col("ORDER_TS"), \
                                                                F.to_date(F.col("ORDER_TS")).alias("ORDER_TS_DATE"), \
                                                                F.col("ORDER_AMOUNT"), \
                                                                F.col("ORDER_TAX_AMOUNT"), \
                                                                F.col("ORDER_DISCOUNT_AMOUNT"), \
                                                                F.col("LOCATION_ID"), \
                                                                F.col("ORDER_TOTAL"))
    truck = session.table("RAW_POS.TRUCK").select(F.col("TRUCK_ID"), \
                                                F.col("PRIMARY_CITY"), \
                                                F.col("REGION"), \
                                                F.col("COUNTRY"), \
                                                F.col("FRANCHISE_FLAG"), \
                                                F.col("FRANCHISE_ID"))
    menu = session.table("RAW_POS.MENU").select(F.col("MENU_ITEM_ID"), \
                                                F.col("TRUCK_BRAND_NAME"), \
                                                F.col("MENU_TYPE"), \
                                                F.col("MENU_ITEM_NAME"))
    franchise = session.table("RAW_POS.FRANCHISE").select(F.col("FRANCHISE_ID"), \
                                                        F.col("FIRST_NAME").alias("FRANCHISEE_FIRST_NAME"), \
                                                        F.col("LAST_NAME").alias("FRANCHISEE_LAST_NAME"))
    location = session.table("RAW_POS.LOCATION").select(F.col("LOCATION_ID"))

    
    '''
    We can do this one of two ways: either select before the join so it is more explicit, or just join on the full tables.
    The end result is the same, it's mostly a readibility question.
    '''
    # order_detail = session.table("RAW_POS.ORDER_DETAIL")
    # order_header = session.table("RAW_POS.ORDER_HEADER")
    # truck = session.table("RAW_POS.TRUCK")
    # menu = session.table("RAW_POS.MENU")
    # franchise = session.table("RAW_POS.FRANCHISE")
    # location = session.table("RAW_POS.LOCATION")

    t_with_f = truck.join(franchise, truck['FRANCHISE_ID'] == franchise['FRANCHISE_ID'], rsuffix='_f')
    oh_w_t_and_l = order_header.join(t_with_f, order_header['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
                                .join(location, order_header['LOCATION_ID'] == location['LOCATION_ID'], rsuffix='_l')
    final_df = order_detail.join(oh_w_t_and_l, order_detail['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
                            .join(menu, order_detail['MENU_ITEM_ID'] == menu['MENU_ITEM_ID'], rsuffix='_m')
    final_df = final_df.select(F.col("ORDER_ID"), \
                            F.col("TRUCK_ID"), \
                            F.col("ORDER_TS"), \
                            F.col('ORDER_TS_DATE'), \
                            F.col("ORDER_DETAIL_ID"), \
                            F.col("LINE_NUMBER"), \
                            F.col("TRUCK_BRAND_NAME"), \
                            F.col("MENU_TYPE"), \
                            F.col("PRIMARY_CITY"), \
                            F.col("REGION"), \
                            F.col("COUNTRY"), \
                            F.col("FRANCHISE_FLAG"), \
                            F.col("FRANCHISE_ID"), \
                            F.col("FRANCHISEE_FIRST_NAME"), \
                            F.col("FRANCHISEE_LAST_NAME"), \
                            F.col("LOCATION_ID"), \
                            F.col("MENU_ITEM_ID"), \
                            F.col("MENU_ITEM_NAME"), \
                            F.col("QUANTITY"), \
                            F.col("UNIT_PRICE"), \
                            F.col("PRICE"), \
                            F.col("ORDER_AMOUNT"), \
                            F.col("ORDER_TAX_AMOUNT"), \
                            F.col("ORDER_DISCOUNT_AMOUNT"), \
                            F.col("ORDER_TOTAL"))
    final_df.create_or_replace_view('POS_FLATTENED_V')

def create_pos_view_stream(session):
    session.use_schema('HARMONIZED')
    _ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()

def test_pos_view(session):
    session.use_schema('HARMONIZED')
    tv = session.table('POS_FLATTENED_V')
    tv.limit(5).show()


# For local debugging
if __name__ == "__main__":
    # Create a local Snowpark session
    with Session.builder.getOrCreate() as session:
        create_pos_view(session)
        create_pos_view_stream(session)
#        test_pos_view(session)

In [None]:
SELECT * FROM HOL_DB.ANALYTICS.LOAN_TAPE
LIMIT 10

In [None]:
CREATE STORAGE INTEGRATION s3_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::774305574730:role/snowflake_access_role'
  STORAGE_ALLOWED_LOCATIONS = ('s3://hanifidemirel/')

In [None]:
desc integration s3_integration

In [None]:
CREATE OR REPLACE FILE FORMAT PARQUET_FORMAT
   TYPE = PARQUET
   COMPRESSION = SNAPPY

In [None]:
CREATE or replace STAGE s3_stage_loan_tape
  STORAGE_INTEGRATION = s3_integration
  URL = 's3://hanifidemirel/loan_tape/'
  FILE_FORMAT = PARQUET_FORMAT;
  CREATE or replace STAGE s3_stage_pre_sale_loan_tape
  STORAGE_INTEGRATION = s3_integration
  URL = 's3://hanifidemirel/pre_sale_loan_tape/'
  FILE_FORMAT = PARQUET_FORMAT;

In [None]:
list @s3_stage_pre_sale_loan_tape

In [None]:
-- Creating table
create or replace TABLE loan_tape (
	ACCRUAL_METHOD VARCHAR(16777216),
	APR NUMBER(38,12),
	AMOUNT_OVERDUE NUMBER(38,6),
	AUTH_TIME TIMESTAMP_NTZ(9),
	CAPTURE_AMOUNT NUMBER(38,12),
	CAPTURE_TIME TIMESTAMP_NTZ(9),
	CHARGE_ARI VARCHAR(16777216),
	CHARGEOFF_DATE DATE,
	CNS_PRINCIPAL NUMBER(38,12),
	CNS_ACCRUED_INTEREST NUMBER(38,12),
	CNS_PRINCIPAL_CHARGEDOFF NUMBER(38,12),
	CNS_ACCRUED_INTEREST_CHARGEDOFF NUMBER(38,12),
	CNS_PRINCIPAL_CHARGEDOFF_RECOVERY NUMBER(38,12),
	CNS_ACCRUED_INTEREST_CHARGEDOFF_RECOVERY NUMBER(38,12),
	CNS_PRINCIPAL_DISPUTED NUMBER(38,12),
	CNS_PRINCIPAL_REFUNDED NUMBER(38,12),
	CNS_INTEREST_PAID NUMBER(38,12),
	CONFIRM_TIME TIMESTAMP_NTZ(9),
	CR_DEBT_UTILIZATION_PERCENT NUMBER(38,12),
	CR_EARLIEST_ACCOUNT_DATE DATE,
	CR_INQUIRIES_6_MONTHS NUMBER(38,0),
	CR_MOST_RECENT_DQ_DATE DATE,
	CR_N_30_DAY_DQ NUMBER(38,0),
	CR_N_DEROGS NUMBER(38,0),
	CR_N_OPEN_ACCOUNTS NUMBER(38,0),
	CR_PULL_DATE DATE,
	CR_REVOLVING_BALANCE NUMBER(38,12),
	DAYS_OVERDUE NUMBER(38,0),
	DECISION_TYPE VARCHAR(16777216),
	DISBURSED_AMOUNT NUMBER(38,12),
	DISPUTE_STATUS BOOLEAN,
	DOWNPAYMENT_AMOUNT NUMBER(38,12),
	DSB_ACCRUED_INTEREST NUMBER(38,12),
	DSB_INTEREST_PAID_ST NUMBER(38,12),
	DSB_PRINCIPAL NUMBER(38,12),
	DSB_PRINCIPAL_CHARGEDOFF NUMBER(38,12),
	DSB_PRINCIPAL_CHARGEDOFF_RECOVERY NUMBER(38,12),
	DSB_PRINCIPAL_DISPUTED NUMBER(38,12),
	DSB_YIELD NUMBER(38,12),
	FICO_SCORE NUMBER(38,0),
	CREDIT_VISION NUMBER(38,0),
	GRADE VARCHAR(16777216),
	INTEREST_RATE NUMBER(38,12),
	IS_AUTOPAY BOOLEAN,
	IS_FIRST_PAYMENT_OVERDUE BOOLEAN,
	FIRST_PAYMENT_DAYS_OVERDUE NUMBER(38,0),
	IS_FRAUDULENT BOOLEAN,
	IS_ABUSE BOOLEAN,
	IS_SCRA BOOLEAN,
	IS_VCN BOOLEAN,
	LAST_CREDIT_CHECK TIMESTAMP_NTZ(9),
	LM_EFFECTIVE TIMESTAMP_NTZ(9),
	LOAN_AMOUNT NUMBER(38,12),
	MATURATION_DATE TIMESTAMP_NTZ(9),
	MDR NUMBER(38,12),
	MERCHANT_ARI VARCHAR(16777216),
	MERCHANT_INDUSTRY VARCHAR(16777216),
	MERCHANT_NAME VARCHAR(16777216),
	MERCHANT_SUB_INDUSTRY VARCHAR(16777216),
	OWN_ACCRUED_INTEREST NUMBER(38,12),
	OWN_PRINCIPAL NUMBER(38,12),
	OWN_PRINCIPAL_CHARGEDOFF NUMBER(38,12),
	OWN_PRINCIPAL_CHARGEDOFF_RECOVERY NUMBER(38,12),
	OWN_PRINCIPAL_DISPUTED NUMBER(38,12),
	OWN_PRINCIPAL_REFUNDED NUMBER(38,12),
	OWN_YIELD NUMBER(38,12),
	CHARGEDOFF_RECOVERY_FEE NUMBER(38,12),
	OWNER VARCHAR(16777216),
	PARENT_MERCHANT_ARI VARCHAR(16777216),
	PARENT_MERCHANT_NAME VARCHAR(16777216),
	PAYMENT_AMOUNT NUMBER(38,12),
	REGION_CODE VARCHAR(16777216),
	SSN_CRYPT_ARI VARCHAR(16777216),
	STATUS VARCHAR(16777216),
	TARGET_APR NUMBER(38,12),
	TERM NUMBER(38,0),
	USER_ARI VARCHAR(16777216),
	CITIZENS_CREDIT_TIER NUMBER(38,0),
	DTI NUMBER(38,12),
	ANNUAL_INCOME NUMBER(38,12),
	DTI_MONTHLY_AFFIRM_DEBTS_DOLLARS NUMBER(38,12),
	DTI_MONTHLY_OTHER_PMTS_DOLLARS NUMBER(38,12),
	DTI_EST_WOULD_BE_MONTHLY_PAYMENT_DOLLARS NUMBER(38,12),
	COLLATERAL_TYPE VARCHAR(16777216),
	PREQUAL_APPROVED_AMOUNT NUMBER(38,12),
	TOTAL_USER_AFFIRM_DEBT_CENTS NUMBER(38,12),
	REMAINING_TERM NUMBER(38,0),
	FIRST_DUE_DATE DATE,
	LAST_PAYMENT_MADE_DATE DATE,
	HAS_PAYMENT_DEFERRAL BOOLEAN,
	HAS_ACTIVE_DEFERRAL BOOLEAN,
	HAS_OVERDUE_DEFERRAL BOOLEAN,
	DAYS_NEXT_DUE_DATE_DEFERRED NUMBER(38,0),
	DAYS_OVERDUE_AS_OF_DEFERRAL_START NUMBER(38,0),
	DAYS_OVERDUE_AS_OF_DEFERRAL_CONTACT NUMBER(38,0),
	MATURATION_DATE_BEFORE_DEFERRAL TIMESTAMP_NTZ(9),
	NEXT_PAYMENT_DUE_DATE DATE,
	IS_TERMS_MODIFIED BOOLEAN,
	LOAN_MODIFICATION_REASONS VARIANT,
	TERMS_MODIFICATION_REASON VARCHAR(16777216),
	ORIGINAL_MATURATION_DATE TIMESTAMP_NTZ(9),
	ORIGINAL_TERM NUMBER(38,0),
	HAS_REAMORTIZATION BOOLEAN,
	HAS_OVERDUE_REAMORTIZATION BOOLEAN,
	DAYS_OVERDUE_AS_OF_REAMORTIZATION_START NUMBER(38,0),
	REMAINING_TERM_AS_OF_CHARGEOFF_DATE NUMBER(38,0),
	ORIGINAL_TERM_IN_MONTHS NUMBER(38,12),
	TERM_IN_MONTHS NUMBER(38,12),
	REMAINING_TERM_IN_MONTHS NUMBER(38,12),
	REMAINING_TERM_AS_OF_CHARGEOFF_DATE_IN_MONTHS NUMBER(38,12),
	ORIGINATOR VARCHAR(16777216),
	PLAN_FREQUENCY VARCHAR(16777216),
	PLAN_INTERVAL NUMBER(38,0),
	USER_POSTAL_CODE VARCHAR(16777216),
	LAST_ACTIVITY_DATE DATE,
	APPROVED_AMOUNT NUMBER(38,12),
	FIRST_EXPECTED_PAYMENT_DATE TIMESTAMP_NTZ(9),
	ORIGINATION_DATE DATE,
	EXPECTED_PAYMENT_AMOUNTS VARIANT,
	EXPECTED_PAYMENT_DATES VARIANT,
	PAID_AMOUNT NUMBER(38,12),
	USER_CNS_TOTAL_BALANCE NUMBER(38,12),
	CNS_PRINCIPAL_NET_DEFAULT NUMBER(38,12),
	CNS_PRINCIPAL_CHARGEDOFF_WRITEDOWN NUMBER(38,12),
	CNS_INTEREST_NET_DEFAULT NUMBER(38,12),
	OWN_PRINCIPAL_NET_DEFAULT NUMBER(38,12),
	CNS_FINANCED_RECEIVABLE_PAID_TOTAL NUMBER(38,12),
	CONSUMER_TYPE VARCHAR(16777216),
	IS_DEBIT_PLUS BOOLEAN,
	USER_HAS_PREVIOUS_LOAN BOOLEAN,
	MERCHANT_IS_INSOLVENT BOOLEAN,
	FRAUD_DATE DATE,
	ADJUSTED_PRICE_FACTOR NUMBER(38,12),
	ADJUSTMENT_EFFECTIVE TIMESTAMP_NTZ(9),
	ADVANCE_RATE NUMBER(38,12),
	ALLOWABLE_ADVANCE NUMBER(38,12),
	CNS_ORIGINAL_PLEDGE_BALANCE NUMBER(38,12),
	CNS_PREVIOUS_PRINCIPAL NUMBER(38,12),
	CNS_INTEREST_PAID_ACTIVITY NUMBER(38,12),
	DAYS_TO_MATURITY NUMBER(38,0),
	DISBURSED_AMOUNT_LESS_DOWNPAYMENT NUMBER(38,12),
	DISCOUNTED_BALANCE NUMBER(38,12),
	DISCOUNTED_BALANCE_CHARGEDOFF NUMBER(38,12),
	DSB_ADJUSTED_PRINCIPAL NUMBER(38,12),
	DSB_ORIGINAL_PLEDGE_BALANCE NUMBER(38,12),
	DSB_PREVIOUS_PRINCIPAL NUMBER(38,12),
	HAS_PAYMENT_BEFORE_REPURCHASE BOOLEAN,
	INVESTOR_ON_PAR_PRINCIPAL NUMBER(38,12),
	IS_ABOVE_CONCENTRATION_LIMIT BOOLEAN,
	IS_ELIGIBLE BOOLEAN,
	IS_EXPLORATION BOOLEAN,
	IS_POS BOOLEAN,
	NOTE_A_BALANCE NUMBER(38,12),
	NOTE_B_BALANCE NUMBER(38,12),
	ORIGINAL_PLEDGE_ACCRUED_INTEREST NUMBER(38,12),
	ORIGINAL_PLEDGE_DATE DATE,
	ORIGINAL_SETTLEMENT_PROCEEDS NUMBER(38,12),
	OTHER_DOWNWARD_ADJUSTMENT NUMBER(38,12),
	OTHER_UPWARD_ADJUSTMENT NUMBER(38,12),
	OWN_ORIGINAL_PLEDGE_BALANCE NUMBER(38,12),
	OWN_PREVIOUS_PRINCIPAL NUMBER(38,12),
	PRINCIPAL_ACTIVITY NUMBER(38,12),
	PURCHASE_PRICE_CONVENTION VARCHAR(16777216),
	REFUND_ACTIVITY NUMBER(38,12),
	SALE_PURCHASE_PRICE_FACTOR NUMBER(38,12),
	SELF_REPORTED_INCOME NUMBER(38,12),
	SERVICE_CREDIT_PAID NUMBER(38,12),
	SETTLEMENT_DATE DATE,
	SETTLEMENT_PROCEEDS NUMBER(38,12),
	ZERO_OUT_ADJUSTMENT NUMBER(38,12),
	INCOME_VERIFIED BOOLEAN,
	WEIGHTED_AVERAGE_LIFE NUMBER(38,12),
	EXPECTED_PAYMENT_AMOUNT_NEXT_MONTH NUMBER(38,12),
	ANNUAL_ORIGINATION_IRR NUMBER(38,12),
	DISCOUNT_RATE NUMBER(38,12),
	CGL NUMBER(38,12),
	REFUND_RATE NUMBER(38,12),
	DURATION NUMBER(38,12),
	BENCHMARK NUMBER(38,12),
	SPREAD NUMBER(38,12),
	SERVICING_FEE NUMBER(38,12),
	PRE_DISCOUNTED_SETTLEMENT_PROCEEDS NUMBER(38,12),
	IS_ABOVE_CONCENTRATION_MAP VARIANT,
	DISCOUNTED_PRINCIPAL_ACTIVITY NUMBER(38,12),
	DISCOUNTED_SERVICE_CREDIT_PAID NUMBER(38,12),
	DISCOUNTED_REFUND_ACTIVITY NUMBER(38,12),
	DISCOUNTED_OTHER_UPWARD_ADJUSTMENT NUMBER(38,12),
	DISCOUNTED_OTHER_DOWNWARD_ADJUSTMENT NUMBER(38,12),
	DISCOUNTED_GROSS_BALANCE_CHARGEDOFF NUMBER(38,12),
	ADVANCE_RATES_MAP VARIANT,
	PRICE_FACTOR NUMBER(38,12),
	DEAL_ID NUMBER(38,0),
	EFFECTIVE TIMESTAMP_NTZ(9),
	PLEDGE_DATE DATE,
	SNAPSHOT_ID NUMBER(38,0),
	ORDER_ID NUMBER(38,0)
);
-- Copying data from Stage file into Snowflake table
COPY  INTO loan_tape FROM @s3_stage_loan_tape/part
FILE_FORMAT  = parquet_format
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE

In [None]:
import time
from snowflake.snowpark import Session

with Session.builder.getOrCreate() as session:
        
    df = session.read.option("compression", "gzip").options({"infer_schema":True}).csv('@s3_stage_pre_sale_loan_tape')
    row_count = df.count()
    print(f"The DataFrame has {row_count} rows.")
    df.write.mode("overwrite").save_as_table("pre_sale_loan_tape")

In [None]:
select count(*) from pre_sale_loan_tape