# Connect to Snowflake

In [10]:
from dotenv import load_dotenv
load_dotenv()     # loads keys into os.environ so the rest of your code sees them

True

In [11]:
# authenticate into Snowflake
from snowflake.snowpark import Session
import os
connection_parameters = {
    "account": os.getenv('SNOWFLAKE_ACCOUNT'),
    "user": os.getenv('SNOWFLAKE_USER'),
    "password": os.getenv('SNOWFLAKE_PASSWORD'),
    "role": os.getenv('SNOWFLAKE_ROLE'),
    "warehouse": os.getenv('SNOWFLAKE_WAREHOUSE'),
    "database": os.getenv('SNOWFLAKE_DATABASE'),
    "schema": os.getenv('SNOWFLAKE_SCHEMA')
}
session = Session.builder.configs(connection_parameters).create()

In [12]:
# check connection has been successful
print("Session Current Account:", session.get_current_account())

Session Current Account: "WEVIRIP-NA38028"


In [13]:
session.use_database("SNOWPARK_DEFINITIVE_GUIDE"),
session.use_schema("MY_SCHEMA")

purchase_history = session.table("PURCHASE_HISTORY")
campaign_info = session.table("CAMPAIGN_INFO")
complain_info = session.table("COMPLAINT_INFO")
marketing_additional = session.table("MARKETING_ADDITIONAL")

# Data Engineering Pipelines

## Step 1 - Joining Purchase History Table with Campaign Info

In [14]:
# join the purchase history with the campaign information
def combine_campaign_table(purchase_history, campaign_info):
    purchase_campaign = purchase_history.join(
        campaign_info, \
        purchase_history.ID == campaign_info.ID, \
        lsuffix='_left', rsuffix='_right'
    )
    purchase_campaign = purchase_campaign.drop('ID_RIGHT')
    return purchase_campaign

## Step 2 - Joining Purchase Campaign with Complain Info Table

In [15]:
# join the purchase campaign with the complaint information 
# using the same ID column and then create a purchase_campaign_complain DataFrame
def combine_complain_table(purchase_campaign, complain_info):
    purchase_campaign_complain = purchase_campaign.join(
        complain_info, \
        purchase_campaign['ID_LEFT'] == complain_info.ID
    )
    purchase_campaign_complain = purchase_campaign_complain.drop('ID_LEFT')
    return purchase_campaign_complain

## Step 3 - Union Additional Marketing Table with Purchase Campaign Complain Data

In [16]:
# marketing table is created by the union of the data between the purchase complaint and the marketing table
def union_marketing_additional_table(purchase_campaign_complain,marketing_additional):
    final_marketing_table = purchase_campaign_complain.union_by_name(marketing_additional)
    return final_marketing_table

## Stored Procedure - Data Preparation

In [18]:
from snowflake.snowpark.functions import sproc
import snowflake

def data_prep(session: Session) -> str:
    #### Loading Required Tables
    purchase_history = session.table("PURCHASE_HISTORY")
    campaign_info = session.table("CAMPAIGN_INFO")
    complain_info = session.table("COMPLAINT_INFO")
    marketing_additional = session.table("MARKETING_ADDITIONAL")

    #### Calling Step 1
    purchase_campaign = combine_campaign_table(purchase_history, campaign_info)

    #### Calling Step 2
    purchase_campaign_complain = combine_complain_table(purchase_campaign, complain_info)

    #### Calling Step 3
    final_marketing_data = union_marketing_additional_table(purchase_campaign_complain, marketing_additional)

    #### Writing Combined Data to New Table
    final_marketing_data.write.save_as_table('FINAL_MARKETING_DATA')
    return 'LOADED FINAL MARKETING DATA TABLE'

# Create an instance of StoredProcedure using the sproc() function
from snowflake.snowpark.types import IntegerType, StringType
data_prep_sproc = sproc(
                            func=data_prep,\
                            replace=True,\
                            return_type = StringType(),\
                            stage_location='@my_stage',\
                            packages=['snowflake-snowpark-python']
                        )

## Stored Procedure - Data Transformation

In [19]:
def data_transform(session: Session) -> str:
    
    #### Loading Required Tables
    marketing_final = session.table("FINAL_MARKETING_DATA")
    market_subset = marketing_final.select('EDUCATION', 'MARITAL_STATUS', 'INCOME')
    market_pivot = market_subset.pivot("EDUCATION",["Graduation","PhD","Master","Basic","2n Cycle"]).sum("INCOME")

    #### Writing Transformed Data to New Table
    market_pivot.write.save_as_table('MARKETING_PIVOT')
    return 'CREATED MARKETING PIVOT TABLE'

data_transform_sproc = sproc(
                        func= data_transform,\
                        replace=True,\
                        return_type = StringType(),\
                        stage_location='@MY_STAGE',\
                        packages=['snowflake-snowpark-python']
)

## Stored Procedure - Data Cleanup

In [20]:
def data_cleanup(session: Session) -> str:
    
    #### Loading Required Tables
    market_pivot = session.table("MARKETING_PIVOT")
    
    market_drop_null = market_pivot.dropna(thresh=5)


    #### Writing Cleaned Data To New Table
    market_drop_null.write.save_as_table("MARKET_PIVOT_CLEANED")
    return "CREATED CLEANED TABLE"


data_cleanup_sproc = sproc(
                        func= data_cleanup,\
                        replace=True,\
                        return_type = StringType(),\
                        stage_location="@my_stage",\
                        packages=["snowflake-snowpark-python"]
                        )

In [None]:
# session.sql("drop table FINAL_MARKETING_DATA;").show()
# session.sql("drop table MARKETING_PIVOT;").show()
# session.sql("drop table MARKET_PIVOT_CLEANED;").show()

## Calling Stored Procedure

In [21]:
#### Calling Data Preparation Stored Procedure
data_prep_sproc()

#### Calling Data Transformation Stored Procedure
data_transform_sproc()

#### Calling Data Cleanup Stored Procedure
data_cleanup_sproc()

'CREATED CLEANED TABLE'

# Close Snowflake Session

In [22]:
# always close a session
session.close()