In [1]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import month, year, col, sum
from snowflake.snowpark.version import VERSION

import json
import logging
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

In [5]:
connection_parameters = json.load(open("C:\\Users\\argupta\\Snowflake\\Snowpark\\auth.json"))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

In [6]:
session.use_database('demo_db')
session.use_schema('public')

In [7]:
spend_df = session.table('campaign_spend')
spend_df.queries

{'queries': ['SELECT  *  FROM (campaign_spend)'], 'post_actions': []}

In [8]:
with session.query_history() as history:
    spend_df.show(20)
history.queries

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

[QueryRecord(query_id='01adbf89-0001-3544-0002-1e3200089462', sql_text='SELECT  *  FROM campaign_spend LIMIT 20')]

In [11]:
# Total Spend per Year and Month For all Channels

spend_df_per_channel = spend_df.group_by(year('DATE'), month('DATE'), 'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).with_column_renamed('"YEAR(DATE)"', "YEAR").with_column_renamed('"MONTH(DATE)"', "MONTH").sort('YEAR', 'MONTH')

spend_df_per_channel.show(10)

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |search_engine  |516431        |
|2012    |5        |video          |516729        |
|2012    |5        |email          |517208        |
|2012    |5        |social_media   |517618        |
|2012    |6        |video          |501098        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |email          |501947        |
|2012    |7        |search_engine  |522780        |
|2012    |7        |email          |518405        |
---------------------------------------------------



In [12]:
# Pivot on Channel: Total Spend Across All Channels

spend_df_per_month = spend_df_per_channel.pivot('CHANNEL', ['search_engine', 'social_media', 'video', 'email']).sum('TOTAL_COST')
spend_df_per_month = spend_df_per_month.select(
    col('YEAR'),
    col('MONTH'),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
spend_df_per_month.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
|2013    |3        |518044           |523408          |523688   |519430   |
|2013    |5        |521339           |521528          |519625   |521698   |
------------

In [13]:
# Save Transformed data into Snowflake Table

spend_df_per_month.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

In [15]:
# Automation: Run Campaign Spend Data Transformation as Snowflake Task

def campaign_spend_data_pipeline(session: Session) -> str:
    # Data Transformation
    # Perform the following actions to transform the data

    # Load the campaign spend data
    spend_df_spend_t = session.table('campaign_spend')

    # Transform the data
    spend_df_per_channel_t = spend_df_spend_t.group_by(year('DATE'), month('DATE'), 'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).with_column_renamed('"YEAR(DATE)"', "YEAR").with_column_renamed('"MONTH(DATE)"', "MONTH").sort('YEAR', 'MONTH')

    spend_df_per_month_t = spend_df_per_channel_t.pivot('CHANNEL', ['search_engine', 'social_media', 'video', 'email']).sum('TOTAL_COST')

    spend_df_per_month_t = spend_df_per_month_t.select(
        col('YEAR'),
        col('MONTH'),
        col("'search_engine'").as_("SEARCH_ENGINE"),
        col("'social_media'").as_("SOCIAL_MEDIA"),
        col("'video'").as_("VIDEO"),
        col("'email'").as_("EMAIL")
    )

    # Save transformed data
    spend_df_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')

# Register data pipelining function as a Stored Procedure so it can be run as a task
session.sproc.register(
    func=campaign_spend_data_pipeline,
    name="campaign_spend_data_pipeline",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="@camp_sprocs",
    replace = True)

campaign_spend_data_pipeline_task = """
CREATE OR REPLACE TASK campaign_spend_data_pipeline_task
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE = '3 MINUTE'
AS
    CALL campaign_spend_data_pipeline()
"""

session.sql(campaign_spend_data_pipeline_task).collect()

[Row(status='Task CAMPAIGN_SPEND_DATA_PIPELINE_TASK successfully created.')]

In [18]:
# Total Revenue per Year and Month
df_revenue = session.table('monthly_revenue')
df_revenue_per_month = df_revenue.group_by('YEAR', 'MONTH').agg(sum('REVENUE')).sort('YEAR', 'MONTH').with_column_renamed('SUM(REVENUE)', "REVENUE")
df_revenue_per_month.show()

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2012    |11       |3185894.64  |
|2012    |12       |3334570.96  |
|2013    |1        |3316455.44  |
|2013    |2        |2995042.21  |
---------------------------------



In [20]:
# Join Total Spend and Total Revenue per Year and Month Across All Channels

df_spend_and_revenue_per_month = spend_df_per_month.join(df_revenue_per_month, ["YEAR", "MONTH"])
df_spend_and_revenue_per_month.show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711          |521427   |520724   |3334570.96  |
|2013    |1        |5

In [21]:
# Examine Snowpark DF Query and Execution Plan
df_spend_and_revenue_per_month.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "SEARCH_ENGINE" AS "SEARCH_ENGINE", "SOCIAL_MEDIA" AS "SOCIAL_MEDIA", "VIDEO" AS "VIDEO", "EMAIL" AS "EMAIL" FROM ( SELECT "YEAR", "MONTH", "'search_engine'" AS "SEARCH_ENGINE", "'social_media'" AS "SOCIAL_MEDIA", "'video'" AS "VIDEO", "'email'" AS "EMAIL" FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT  *  FROM campaign_spend) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST) PIVOT (sum("TOTAL_COST") FOR "CHANNEL" IN ('search_engine', 'social_media', 'video', 'email'))))) AS SNOWPARK_LEFT INNER JOIN ( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "REVENUE" AS "REVENUE" FROM ( SELECT "YEAR", "MONTH", "

In [22]:
# Save Transformed Data into Snowflake Table
df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

In [23]:
# Automation: Run Monthly Revenue Data Transformations as a Snowflake Task
def monthly_revenue_data_pipeline(session: Session) -> str:
    # Load spend table and revenue tables and transform revenue table into revenue per month
    spend_df_per_month_t = session.table('spend_per_month')
    df_revenue_t = session.table('monthly_revenue')
    df_revenue_per_month_t = df_revenue_t.group_by('YEAR', 'MONTH').agg(sum('REVENUE')).sort('YEAR', 'MONTH').with_column_renamed('SUM(REVENUE)', "REVENUE")

    # join revenue per month and spend per month
    df_spend_and_revenue_per_month_t = spend_df_per_month_t.join(df_revenue_per_month_t, ["YEAR", "MONTH"])

    # Save in a new table for next task
    df_spend_and_revenue_per_month_t.write.mode('overwrite').save_as_table('SPEND_AND_REVENUE_PER_MONTH')

# Register data pipelining function as stored procedure so it can be run as a task
session.sproc.register(
    func=monthly_revenue_data_pipeline,
    name="monthly_revenue_data_pipeline",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="camp_sprocs",
    replace=True
)

monthly_revenue_data_pipeline_task = """
CREATE OR REPLACE TASK monthly_revenue_data_pipeline_task
    WAREHOUSE = 'COMPUTE_WH'
    AFTER campaign_spend_data_pipeline_task
AS
    CALL monthly_revenue_data_pipeline()
"""
session.sql(monthly_revenue_data_pipeline_task).collect()

[Row(status='Task MONTHLY_REVENUE_DATA_PIPELINE_TASK successfully created.')]

In [24]:
# Resume Tasks
session.sql("alter task monthly_revenue_data_pipeline_task resume").collect()
session.sql("alter task campaign_spend_data_pipeline_task resume").collect()

[Row(status='Statement executed successfully.')]