## Prepare Data
  ***

In [None]:
# Import python packages
import streamlit as st
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
#import snowpark & snow-pandas packages
#Snowflake pandas(modin) is way more friendly than pandas in large-scale(millions) 
import modin.pandas as spd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
#read data
#snowflake has the advantage to not load in RAM(just connection)
data_path='SNOWFLAKE_SAMPLE_DATA.TPCH_SF1'
lineitem_keep_cols=['L_ORDERKEY','L_LINENUMBER','L_PARTKEY','L_RETURNFLAG','L_QUANTITY','L_DISCOUNT','L_EXTENDEDPRICE']
lineitem_df=spd.read_snowflake(f'{data_path}.LINEITEM')[lineitem_keep_cols]

## Explore Data
  ***

In [None]:
#preview
st.dataframe(lineitem_df.head())

In [None]:
print(lineitem_df.shape)
#6M rows, 7col

In [None]:
lineitem_df.info()

In [None]:
#group by, count
print(lineitem_df.L_RETURNFLAG.value_counts())

## Transform Data
  ***

In [None]:
#filter
lineitem_df_skim =lineitem_df[lineitem_df['L_RETURNFLAG']!='A']
print(f'rows before filter: {len(lineitem_df)}, rows after filter: {len(lineitem_df_skim)}')


In [None]:
#create new col
lineitem_df_skim['REVENUE']=lineitem_df_skim['L_EXTENDEDPRICE']*lineitem_df_skim['L_QUANTITY']
st.dataframe(lineitem_df_skim.head())

In [None]:
#group by, agg table
column_agg={
    'L_QUANTITY':['sum'],
    'REVENUE':['sum'] #'mean' for avg.
    } 

lineitem_df_header=lineitem_df_skim.groupby(by=['L_ORDERKEY','L_RETURNFLAG'],as_index=False).agg(column_agg)
lineitem_df_header.columns=['L_ORDERKEY','L_RETURNFLAG','SUM_ITEMS','SUM_REVENUE']
st.dataframe(lineitem_df_header.head())

In [None]:
#group by, pivot table
lineitem_df_header_pivot = lineitem_df_header.pivot_table(
    values=['SUM_ITEMS','SUM_REVENUE'],
    index=['L_ORDERKEY'],
    columns=['L_RETURNFLAG'],
    aggfunc='sum'
)
st.dataframe(lineitem_df_header_pivot.head())                         

In [None]:
#format pivot table into df
lineitem_df_header_pivot_df=lineitem_df_header_pivot.reset_index(names='L_ORDERKEY')
lineitem_df_header_pivot_df.columns=['L_ORDERKEY','SUM_ITEMS_N','SUM_ITEMS_R','SUM_REVENUE_N','SUM_REVENUE_R']
st.dataframe(lineitem_df_header_pivot_df.head())                         

## Join Data
  ***

In [None]:
#read data2
#snowflake has the advantage to not load in RAM(just connection), way more friendly for scaling up
order_keep_cols=['O_ORDERKEY','O_CUSTKEY','O_ORDERSTATUS','O_TOTALPRICE','O_ORDERDATE']
order_df=spd.read_snowflake(f'{data_path}.ORDERS')[order_keep_cols]
st.dataframe(order_df.head())

In [None]:
#join df
items_order_df=lineitem_df_header_pivot_df.merge(order_df,
                                                left_on='L_ORDERKEY',
                                                right_on='O_ORDERKEY',
                                                how='left')
items_order_df.drop('O_ORDERKEY',axis=1,inplace=True)
items_order_df.shape
st.dataframe(items_order_df.head())

## Load Data
  ***

In [None]:
#write to Snowflake table
lineitem_df_header.to_snowflake(name='LINEITEM_HEADER',if_exists='replace',index=False)

In [None]:
select * from LINEITEM_HEADER limit 5

## Visualize Data
  ***

In [None]:
#histogram distribution
lineitem_df_header_pd=lineitem_df_header.to_pandas()
fig,axes=plt.subplots(1,2,figsize=(15,3))
colnames=['SUM_ITEMS','SUM_REVENUE']
for col,ax in zip(colnames,axes.flatten()):
    ax.set_title(col)
    sns.histplot(lineitem_df_header_pd,x=col,ax=ax,kde=True,stat='density',kde_kws=dict(cut=3),alpha=.4)
    fig.tight_layout()

## Orchestrate Data Pipeline
  ***

In [None]:
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core import CreateMode
from snowflake.snowpark import Session
root = Root(session)

In [None]:
#aggregate the previous ELT codes into 1 Python code
def create_header_table(session:Session, data_path:str) -> str:
    from datetime import datetime
    #read data
    lineitem_keep_cols=['L_ORDERKEY','L_LINENUMBER','L_PARTKEY','L_RETURNFLAG','L_QUANTITY','L_DISCOUNT','L_EXTENDEDPRICE']
    lineitem_df=spd.read_snowflake(f'{data_path}.LINEITEM')[lineitem_keep_cols]
    
    #filter
    lineitem_df_skim =lineitem_df[lineitem_df['L_RETURNFLAG']!='A']
    #create new col
    lineitem_df_skim['REVENUE']=lineitem_df_skim['L_EXTENDEDPRICE']*lineitem_df_skim['L_QUANTITY']
    
    #group by, agg table
    column_agg={
        'L_QUANTITY':['sum'],
        'REVENUE':['sum'] #'mean' for avg.
        } 
    lineitem_df_header=lineitem_df_skim.groupby(by=['L_ORDERKEY','L_RETURNFLAG'],as_index=False).agg(column_agg)
    lineitem_df_header.columns=['L_ORDERKEY','L_RETURNFLAG','SUM_ITEMS','SUM_REVENUE']
    #group by, pivot table
    lineitem_df_header_pivot = lineitem_df_header.pivot_table(
    values=['SUM_ITEMS','SUM_REVENUE'],
    index=['L_ORDERKEY'],
    columns=['L_RETURNFLAG'],
    aggfunc='sum'
    )
    #format pivot table into df
    lineitem_df_header_pivot_df=lineitem_df_header_pivot.reset_index(names='L_ORDERKEY')
    lineitem_df_header_pivot_df.columns=['L_ORDERKEY','SUM_ITEMS_N','SUM_ITEMS_R','SUM_REVENUE_N','SUM_REVENUE_R']

    #write to Snowflake table
    timestamp=datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    save_path=f"LINEITEM_HEADER_{timestamp}"
    lineitem_df_header.to_snowflake(name=save_path,if_exists='replace',index=False)
    return f"Load Success at {timestamp}!"


In [None]:
--create stage
CREATE STAGE IF NOT EXISTS task_code_stage;

In [None]:
#create procedure
sp_create_header_table =session.sproc.register(name='create_header_table_sp',
                                              func=create_header_table,replace=True,
                                              is_permanent=True,
                                              packages=['streamlit==1.26.1','modin','snowflake-snowpark-python==1.17.0','pandas==2.2.1','snowflake'],
                                              stage_location='@task_code_stage')

In [None]:
#schedule the procedure
from datetime import timedelta
my_task=Task(name='task_create_header_table',
            definition=f"CALL create_header_table_sp('SNOWFLAKE_SAMPLE_DATA.TPCH_SF1')",
            schedule=timedelta(minutes=1))
tasks=root.databases[session.get_current_database()].schemas[session.get_current_schema()].tasks
task_run=tasks.create(my_task,mode=CreateMode.or_replace)

#new tasks are suspended, therefore need a resume
task_run.resume()

In [None]:
SHOW TASKS LIKE 'task_create_header_table'

In [None]:
--sytax to call procedure manually
CALL create_header_table_sp('SNOWFLAKE_SAMPLE_DATA.TPCH_SF1')