In [1]:
import sys
import logging

from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.functions import col,lit,row_number, rank
from snowflake.snowpark import Window

In [2]:
# initiate logging at info level
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%I:%M:%S')

In [3]:
# snowpark session
def get_snowpark_session() -> Session:
    connection_parameters = {
       "ACCOUNT":"account",
        "USER":"user",
        "PASSWORD":"password",
        "ROLE":"SYSADMIN",
        "DATABASE":"SALES_DWH",
        "SCHEMA":"SOURCE",
        "WAREHOUSE":"SNOWPARK_ETL_WH"
    }
    # creating snowflake session object
    return Session.builder.configs(connection_parameters).create()

In [4]:
session = get_snowpark_session()

context_df = session.sql("select current_role(), current_database(), current_schema(), current_warehouse()")
context_df.show(2)

05:02:20 - INFO - Snowflake Connector for Python Version: 3.7.0, Python Version: 3.8.8, Platform: macOS-10.16-x86_64-i386-64bit
05:02:20 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
05:02:20 - INFO - Snowpark Session information: 
"version" : 1.13.0,
"python.version" : 3.8.8,
"python.connector.version" : 3.7.0,
"python.connector.session.id" : 1276855122592118,
"os.name" : Darwin

05:02:20 - INFO - query: [select current_role(), current_database(), current_schema(), current_warehouse()...]
05:02:20 - INFO - query execution done
05:02:20 - INFO - Number of results in first chunk: 0
05:02:20 - INFO - query: [SELECT  *  FROM (select current_role(), current_database(), current_schema(), cu...]
05:02:20 - INFO - query execution done
05:02:20 - INFO - Number of results in first chunk: 1
------

In [5]:
def filter_dataset(df, column_name, filter_criterian) -> DataFrame:
    # Payment Status = Paid
    # Shipping = Delivered
    return_df = df.filter(col(column_name) == filter_criterian)

    return return_df

In [6]:
 sales_col = session.sql("SHOW COLUMNS IN TABLE SALES_DWH.SOURCE.IN_SALES_ORDER;")
 sales_col.show()

05:02:24 - INFO - query: [SHOW COLUMNS IN TABLE SALES_DWH.SOURCE.IN_SALES_ORDER;]
05:02:25 - INFO - query execution done
05:02:25 - INFO - Number of results in first chunk: 20
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"table_name"    |"schema_name"  |"column_name"       |"data_type"                                         |"null?"  |"default"  |"kind"  |"expression"  |"comment"  |"database_name"  |"autoincrement"  |"schema_evolution_record"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|IN_SALES_ORDER  |SOURCE         |SALES_ORDER_KEY     |{"type":"FIXED","precision":38,"scale":0,"nulla...  |true     |           |C

In [7]:
def main():
    #get the session object and get dataframe
    session = get_snowpark_session()
    sales_df = session.sql("select * from SALES_DWH.SOURCE.IN_SALES_ORDER")

    # apply filter to select only paid and delivered sale orders
    # select * from in_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    paid_sales_df = filter_dataset(sales_df,'PAYMENT_STATUS','Paid')
    shipped_sales_df = filter_dataset(paid_sales_df,'SHIPPING_STATUS','Delivered')

    # adding country and region to the data frame
    # select *, 'IN' as Country, 'APAC' as Region from in_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    country_sales_df = shipped_sales_df.with_column('Country',lit('IN')).with_column('Region',lit('APAC'))

    # join to add forex calculation
    forex_df = session.sql("select * from  SALES_DWH.COMMON.EXCHANGE_RATE")
    sales_with_forext_df = country_sales_df.join(forex_df,country_sales_df['order_dt']==forex_df['DATE'],join_type='outer')
    #sales_with_forext_df.show(2)

    #de-duplication
    #print(sales_with_forext_df.count())
    unique_orders = sales_with_forext_df.with_column('order_rank',rank().over(Window.partitionBy(col("order_dt")).order_by(col("_METADATA_LAST_MODIFIED").desc()))).filter(col("order_rank")==1).select(col('SALES_ORDER_KEY').alias('unique_sales_order_key'))
    #final_sales_df = unique_orders.join(sales_with_forext_df,unique_orders['unique_sales_order_key']==sales_with_forext_df['SALES_ORDER_KEY'],join_type='inner')
    final_sales_df = sales_with_forext_df.select(
        col('SALES_ORDER_KEY'),
        col('ORDER_ID'),
        col('ORDER_DT'),
        col('CUSTOMER_NAME'),
        col('MOBILE_KEY'),
        col('Country'),
        col('Region'),
        col('ORDER_QUANTITY'),
        lit('INR').alias('LOCAL_CURRENCY'),
        col('UNIT_PRICE').alias('LOCAL_UNIT_PRICE'),
        col('PROMOTION_CODE').alias('PROMOTION_CODE'),
        col('FINAL_ORDER_AMOUNT').alias('LOCAL_TOTAL_ORDER_AMT'),
        col('TAX_AMOUNT').alias('local_tax_amt'),
        col('USD2INR').alias("Exchange_Rate"),
        (col('FINAL_ORDER_AMOUNT')/col('USD2INR')).alias('US_TOTAL_ORDER_AMT'),
        (col('TAX_AMOUNT')/col('USD2INR')).alias('USD_TAX_AMT'),
        col('payment_status'),
        col('shipping_status'),
        col('payment_method'),
        col('payment_provider'),
        col('mobile').alias('conctact_no'),
        col('shipping_address')
    )

    #final_sales_df.show(5)
    final_sales_df.write.save_as_table("SALES_DWH.CURATED.IN_SALES_ORDER",mode="append")
    
if __name__ == '__main__':
    main()

05:02:27 - INFO - Snowflake Connector for Python Version: 3.7.0, Python Version: 3.8.8, Platform: macOS-10.16-x86_64-i386-64bit
05:02:27 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
05:02:27 - INFO - Snowpark Session information: 
"version" : 1.13.0,
"python.version" : 3.8.8,
"python.connector.version" : 3.7.0,
"python.connector.session.id" : 1276855122542882,
"os.name" : Darwin

05:02:27 - INFO - query: [select * from SALES_DWH.SOURCE.IN_SALES_ORDER]
05:02:28 - INFO - query execution done
05:02:28 - INFO - Number of results in first chunk: 0
05:02:28 - INFO - query: [select * from  SALES_DWH.COMMON.EXCHANGE_RATE]
05:02:28 - INFO - query execution done
05:02:28 - INFO - Number of results in first chunk: 0
05:02:28 - INFO - query: [SELECT "SALES_ORDER_KEY" AS "SALES_ORDER_KEY", "ORDER_ID

In [8]:
def filter_dataset(df, column_name, filter_criterian) -> DataFrame:
    # Payment Status = Paid
    # Shipping = Delivered
    return_df = df.filter(col(column_name) == filter_criterian)

    return return_df

def main():

    #get the session object and get dataframe
    session = get_snowpark_session()
    sales_df = session.sql("select * from SALES_DWH.SOURCE.US_SALES_ORDER")

    # apply filter to select only paid and delivered sale orders
    # select * from us_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    paid_sales_df = filter_dataset(sales_df,'PAYMENT_STATUS','Paid')
    shipped_sales_df = filter_dataset(paid_sales_df,'SHIPPING_STATUS','Delivered')

    # adding country and region to the data frame
    # select *, 'IN' as Country, 'APAC' as Region from us_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    country_sales_df = shipped_sales_df.with_column('Country',lit('US')).with_column('Region',lit('NA'))

    # join to add forex calculation
    forex_df = session.sql("select * from SALES_DWH.COMMON.EXCHANGE_RATE")
    sales_with_forext_df = country_sales_df.join(forex_df,country_sales_df['order_dt']==forex_df['DATE'],join_type='outer')
    #sales_with_forext_df.show(2)

    #de-duplication
    print(sales_with_forext_df.count())
    unique_orders = sales_with_forext_df.with_column('order_rank',rank().over(Window.partitionBy(col("order_dt")).order_by(col('_metadata_last_modified').desc()))).filter(col("order_rank")==1).select(col('SALES_ORDER_KEY').alias('unique_sales_order_key'))
    final_sales_df = unique_orders.join(sales_with_forext_df,unique_orders['unique_sales_order_key']==sales_with_forext_df['SALES_ORDER_KEY'],join_type='inner')
    final_sales_df = final_sales_df.select(
        col('SALES_ORDER_KEY'),
        col('ORDER_ID'),
        col('ORDER_DT'),
        col('CUSTOMER_NAME'),
        col('MOBILE_KEY'),
        col('Country'),
        col('Region'),
        col('ORDER_QUANTITY'),
        lit('USD').alias('LOCAL_CURRENCY'),
        col('UNIT_PRICE').alias('LOCAL_UNIT_PRICE'),
        col('PROMOTION_CODE').alias('PROMOTION_CODE'),
        col('FINAL_ORDER_AMOUNT').alias('LOCAL_TOTAL_ORDER_AMT'),
        col('TAX_AMOUNT').alias('local_tax_amt'),
        col('USD2INR').alias("Exhchange_Rate"),
        (col('FINAL_ORDER_AMOUNT')/col('USD2USD')).alias('US_TOTAL_ORDER_AMT'),
        (col('TAX_AMOUNT')/col('USD2USD')).alias('USD_TAX_AMT'),
        col('payment_status'),
        col('shipping_status'),
        col('payment_method'),
        col('payment_provider'),
        col('phone').alias('conctact_no'),
        col('shipping_address')
    )

    #final_sales_df.show(5)
    final_sales_df.write.save_as_table("SALES_DWH.CURATED.US_SALES_ORDER",mode="append")
    
if __name__ == '__main__':
    main()

05:02:31 - INFO - Snowflake Connector for Python Version: 3.7.0, Python Version: 3.8.8, Platform: macOS-10.16-x86_64-i386-64bit
05:02:31 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
05:02:32 - INFO - Snowpark Session information: 
"version" : 1.13.0,
"python.version" : 3.8.8,
"python.connector.version" : 3.7.0,
"python.connector.session.id" : 1276855122551118,
"os.name" : Darwin

05:02:32 - INFO - query: [select * from SALES_DWH.SOURCE.US_SALES_ORDER]
05:02:32 - INFO - query execution done
05:02:32 - INFO - Number of results in first chunk: 0
05:02:32 - INFO - query: [select * from SALES_DWH.COMMON.EXCHANGE_RATE]
05:02:32 - INFO - query execution done
05:02:32 - INFO - Number of results in first chunk: 0
05:02:32 - INFO - query: [SELECT "SALES_ORDER_KEY" AS "SALES_ORDER_KEY", "ORDER_ID"

In [9]:
def filter_dataset(df, column_name, filter_criterian) -> DataFrame:
    # Payment Status = Paid
    # Shipping = Delivered
    return_df = df.filter(col(column_name) == filter_criterian)

    return return_df

def main():

    #get the session object and get dataframe
    session = get_snowpark_session()
    sales_df = session.sql("select * from SALES_DWH.SOURCE.FR_SALES_ORDER")

    # apply filter to select only paid and delivered sale orders
    # select * from us_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    paid_sales_df = filter_dataset(sales_df,'PAYMENT_STATUS','Paid')
    shipped_sales_df = filter_dataset(paid_sales_df,'SHIPPING_STATUS','Delivered')

    # adding country and region to the data frame
    # select *, 'IN' as Country, 'APAC' as Region from us_sales_order where PAYMENT_STATUS = 'Paid' and SHIPPING_STATUS = 'Delivered'
    country_sales_df = shipped_sales_df.with_column('Country',lit('FR')).with_column('Region',lit('EU'))

    # join to add forex calculation
    forex_df = session.sql("select * from SALES_DWH.COMMON.EXCHANGE_RATE")
    sales_with_forext_df = country_sales_df.join(forex_df,country_sales_df['order_dt']==forex_df['DATE'],join_type='outer')
    #sales_with_forext_df.show(2)

    #de-duplication
    print(sales_with_forext_df.count())
    unique_orders = sales_with_forext_df.with_column('order_rank',rank().over(Window.partitionBy(col("order_dt")).order_by(col('_metadata_last_modified').desc()))).filter(col("order_rank")==1).select(col('SALES_ORDER_KEY').alias('unique_sales_order_key'))
    final_sales_df = unique_orders.join(sales_with_forext_df,unique_orders['unique_sales_order_key']==sales_with_forext_df['SALES_ORDER_KEY'],join_type='inner')
    final_sales_df = final_sales_df.select(
        col('SALES_ORDER_KEY'),
        col('ORDER_ID'),
        col('ORDER_DT'),
        col('CUSTOMER_NAME'),
        col('MOBILE_KEY'),
        col('Country'),
        col('Region'),
        col('ORDER_QUANTITY'),
        lit('EUR').alias('LOCAL_CURRENCY'),
        col('UNIT_PRICE').alias('LOCAL_UNIT_PRICE'),
        col('PROMOTION_CODE').alias('PROMOTION_CODE'),
        col('FINAL_ORDER_AMOUNT').alias('LOCAL_TOTAL_ORDER_AMT'),
        col('TAX_AMOUNT').alias('local_tax_amt'),
        col('USD2EU').alias("Exhchange_Rate"),
        (col('FINAL_ORDER_AMOUNT')/col('USD2EU')).alias('US_TOTAL_ORDER_AMT'),
        (col('TAX_AMOUNT')/col('USD2EU')).alias('USD_TAX_AMT'),
        col('payment_status'),
        col('shipping_status'),
        col('payment_method'),
        col('payment_provider'),
        col('phone').alias('conctact_no'),
        col('shipping_address')
    )

    #final_sales_df.show(5)
    final_sales_df.write.save_as_table("SALES_DWH.CURATED.FR_SALES_ORDER",mode="append")
    
if __name__ == '__main__':
    main()

05:02:35 - INFO - Snowflake Connector for Python Version: 3.7.0, Python Version: 3.8.8, Platform: macOS-10.16-x86_64-i386-64bit
05:02:35 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
05:02:36 - INFO - Snowpark Session information: 
"version" : 1.13.0,
"python.version" : 3.8.8,
"python.connector.version" : 3.7.0,
"python.connector.session.id" : 1276855122542886,
"os.name" : Darwin

05:02:36 - INFO - query: [select * from SALES_DWH.SOURCE.FR_SALES_ORDER]
05:02:36 - INFO - query execution done
05:02:36 - INFO - Number of results in first chunk: 0
05:02:36 - INFO - query: [select * from SALES_DWH.COMMON.EXCHANGE_RATE]
05:02:36 - INFO - query execution done
05:02:36 - INFO - Number of results in first chunk: 0
05:02:36 - INFO - query: [SELECT "SALES_ORDER_KEY" AS "SALES_ORDER_KEY", "ORDER_ID"