In [4]:
# most of the imports we will need
import json

import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import udf
from snowflake.snowpark.session import Session

In [11]:
CONNECTION_PARAMETERS = {
   "account": 'lzaftnl-bp36065',
   "user": 'yourusername',
   "password": 'yourpassword',
   "role": 'ACCOUNTADMIN'
}

In [12]:
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

In [13]:
session

<snowflake.snowpark.session.Session at 0x162aec760>

In [14]:
session.sql('CREATE OR REPLACE DATABASE HOL_DB').collect()
session.use_database('HOL_DB')

In [15]:
session.sql(''' CREATE OR REPLACE STAGE FROSTBYTE_RAW_STAGE
                URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/'
                ;''').collect()

[Row(status='Stage area FROSTBYTE_RAW_STAGE successfully created.')]

In [16]:
# create a medium size warehouse
session.sql('CREATE OR REPLACE WAREHOUSE HOL_WH WAREHOUSE_SIZE = MEDIUM, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;').collect()

[Row(status='Warehouse HOL_WH successfully created.')]

In [17]:
POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
CUSTOMER_TABLES = ['customer_loyalty']
TABLE_DICT = {
    "pos": {"schema": "PUBLIC", "tables": POS_TABLES},
    "customer": {"schema": "PUBLIC", "tables": CUSTOMER_TABLES}
}

In [18]:
def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
    session.use_schema(schema)
    if year is None:
        location = "@frostbyte_raw_stage/{}/{}".format(s3dir, tname)
    else:
        print('\tLoading year {}'.format(year)) 
        location = "@frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year)
    
    # we can infer schema using the parquet read option
    df = session.read.option("compression", "snappy") \
                            .parquet(location)
    df.copy_into_table("{}".format(tname))

In [19]:
def load_all_raw_tables(session):
    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = X2LARGE WAIT_FOR_COMPLETION = TRUE").collect()

    for s3dir, data in TABLE_DICT.items():
        tnames = data['tables']
        schema = data['schema']
        for tname in tnames:
            print("Loading {}".format(tname))
            # Only load 1 year of data for the order tables at this point
            # We will load the 2022 data later in the lab
            if tname in ['order_header', 'order_detail']:
                for year in ['2021']:
                    load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
            else:
                load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)

    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = MEDIUM").collect()

In [20]:
def validate_raw_tables(session):
    # check column names from the inferred schema
    for tname in POS_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('PUBLIC.{}'.format(tname)).columns))

    for tname in CUSTOMER_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('PUBLIC.{}'.format(tname)).columns))

load_all_raw_tables(session)
validate_raw_tables(session)

Loading country
Loading franchise
Loading location
Loading menu
Loading truck
Loading order_header
	Loading year 2021
Loading order_detail
	Loading year 2021
Loading customer_loyalty
country: 
	['COUNTRY_ID', 'COUNTRY', 'ISO_CURRENCY', 'ISO_COUNTRY', 'CITY_ID', 'CITY', 'CITY_POPULATION']

franchise: 
	['FRANCHISE_ID', 'FIRST_NAME', 'LAST_NAME', 'CITY', 'COUNTRY', 'E_MAIL', 'PHONE_NUMBER']

location: 
	['LOCATION_ID', 'PLACEKEY', 'LOCATION', 'CITY', 'REGION', 'ISO_COUNTRY_CODE', 'COUNTRY']

menu: 
	['MENU_ID', 'MENU_TYPE_ID', 'MENU_TYPE', 'TRUCK_BRAND_NAME', 'MENU_ITEM_ID', 'MENU_ITEM_NAME', 'ITEM_CATEGORY', 'ITEM_SUBCATEGORY', 'COST_OF_GOODS_USD', 'SALE_PRICE_USD', 'MENU_ITEM_HEALTH_METRICS_OBJ']

truck: 
	['TRUCK_ID', 'MENU_TYPE_ID', 'PRIMARY_CITY', 'REGION', 'ISO_REGION', 'COUNTRY', 'ISO_COUNTRY_CODE', 'FRANCHISE_FLAG', 'YEAR', 'MAKE', 'MODEL', 'EV_FLAG', 'FRANCHISE_ID', 'TRUCK_OPENING_DATE']

order_header: 
	['ORDER_ID', 'TRUCK_ID', 'LOCATION_ID', 'CUSTOMER_ID', 'DISCOUNT_ID', 'SHIF

In [21]:
order_header = session.table("ORDER_HEADER"
                     ).select(F.col("ORDER_ID"), \
                                F.col("TRUCK_ID"), \
                                F.col("ORDER_TS"), \
                                F.to_date(F.col("ORDER_TS")).alias("ORDER_TS_DATE"), \
                                F.col("ORDER_AMOUNT"), \
                                F.col("ORDER_TAX_AMOUNT"), \
                                F.col("ORDER_DISCOUNT_AMOUNT"), \
                                F.col("LOCATION_ID"), \
                                F.col("ORDER_TOTAL"))

# select specific columns from the FRANCHISE table and rename the first/last name columns
franchise = session.table("FRANCHISE"
                  ).select(F.col("FRANCHISE_ID"), \
                            F.col("FIRST_NAME").alias("FRANCHISEE_FIRST_NAME"), \
                            F.col("LAST_NAME").alias("FRANCHISEE_LAST_NAME"))

# just pull the entire table for the rest of these
order_detail = session.table("ORDER_DETAIL")
truck = session.table("TRUCK")
menu = session.table("MENU")
location = session.table("LOCATION")

In [22]:
# join franchise to truck
t_with_f = truck.join(franchise, truck['FRANCHISE_ID'] == franchise['FRANCHISE_ID'], rsuffix='_f')

# add in order header and location
oh_w_t_and_l = order_header.join(t_with_f, order_header['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
                           .join(location, order_header['LOCATION_ID'] == location['LOCATION_ID'], rsuffix='_l')

# add in order detail, and menu
final_df = order_detail.join(oh_w_t_and_l, order_detail['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
                       .join(menu, order_detail['MENU_ITEM_ID'] == menu['MENU_ITEM_ID'], rsuffix='_m')


In [23]:
# itemize final column list
final_df = final_df.select(F.col("ORDER_ID"), 
                            F.col("TRUCK_ID"), 
                            F.col("ORDER_TS"), 
                            F.col('ORDER_TS_DATE'), 
                            F.col("ORDER_DETAIL_ID"), 
                            F.col("LINE_NUMBER"), 
                            F.col("TRUCK_BRAND_NAME"), 
                            F.col("MENU_TYPE"), 
                            F.col("PRIMARY_CITY"), 
                            F.col("REGION"), 
                            F.col("COUNTRY"), 
                            F.col("FRANCHISE_FLAG"), 
                            F.col("FRANCHISE_ID"), 
                            F.col("FRANCHISEE_FIRST_NAME"), 
                            F.col("FRANCHISEE_LAST_NAME"), 
                            F.col("LOCATION_ID"), 
                            F.col("MENU_ITEM_ID"), 
                            F.col("MENU_ITEM_NAME"), 
                            F.col("QUANTITY"), 
                            F.col("UNIT_PRICE"), 
                            F.col("PRICE"), 
                            F.col("ORDER_AMOUNT"), 
                            F.col("ORDER_TAX_AMOUNT"), 
                            F.col("ORDER_DISCOUNT_AMOUNT"), 
                            F.col("ORDER_TOTAL"))

# create a view based on the above
final_df.create_or_replace_view('POS_FLATTENED_V')

[Row(status='View POS_FLATTENED_V successfully created.')]