# Snowpark Data Engineering
<br>
<a href="https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/index">Snowpark API Reference (Python)</a>
<br>
<br>
<a href="https://app.snowflake.com/">Snowflake Service</a>



In [183]:
from snowflake.snowpark.session import Session
#from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import upper, col, month, year, sum, sproc,count

from snowflake.snowpark.version import VERSION

# Misc
import sys
import json
import logging 
import numpy as np
import pandas as pd
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

### Establish Secure Connection to Snowflake

In [184]:
# Create Snowflake Session object
connection_parameters = json.load(open('Creds/prod.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

### Load Aggregated Campaign Spend Data from Snowflake table into Snowpark DataFrame


In [186]:

#Conect to a Snowflake Table
CAMPAIGN_SPEND = session.table("DEMODB.DE_SCHEMA.CAMPAIGN_SPEND")
CAMPAIGN_SPEND.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

In [187]:
CAMPAIGN_SPEND.queries

{'queries': ['SELECT  *  FROM (DEMODB.DE_SCHEMA.CAMPAIGN_SPEND)'],
 'post_actions': []}

In [188]:
CAMPAIGN_SPEND.columns

['CAMPAIGN', 'CHANNEL', 'DATE', 'TOTAL_CLICKS', 'TOTAL_COST', 'ADS_SERVED']

In [189]:
CAMPAIGN_SPEND.describe().show()

------------------------------------------------------------------------------------------------------------
|"SUMMARY"  |"CAMPAIGN"          |"CHANNEL"  |"TOTAL_CLICKS"     |"TOTAL_COST"        |"ADS_SERVED"        |
------------------------------------------------------------------------------------------------------------
|count      |293120              |293120     |293120.0           |293120.0            |293120.0            |
|mean       |NULL                |NULL       |108.004889         |839.886333          |215.97909           |
|min        |building_community  |email      |35.0               |248.0               |72.0                |
|max        |youth_on_course     |video      |295.0              |2357.0              |591.0               |
|stddev     |NULL                |NULL       |54.39864871483482  |420.97817111579553  |107.73121465480652  |
------------------------------------------------------------------------------------------------------------



In [190]:
CAMPAIGN_SPEND_PD =  CAMPAIGN_SPEND.to_pandas() 

In [194]:
CAMPAIGN_SPEND_PD.head(10)

Unnamed: 0,CAMPAIGN,CHANNEL,DATE,TOTAL_CLICKS,TOTAL_COST,ADS_SERVED
0,winter_sports,video,2012-06-03,213,1762,426
1,sports_across_cultures,video,2012-06-02,87,678,157
2,building_community,search_engine,2012-06-03,66,471,134
3,world_series,social_media,2017-12-28,72,591,149
4,winter_sports,email,2018-02-09,252,1841,473
5,spring_break,video,2017-11-14,162,1155,304
6,nba_finals,email,2017-11-22,68,480,134
7,winter_sports,social_media,2018-03-10,227,1797,454
8,spring_break,search_engine,2017-08-30,150,1226,302
9,uefa,video,2017-09-30,81,701,168


##### Memory used Snowpark DF vs Pandas DF
<br>
<br>
 
  


In [195]:
print('Size in MB of Pandas DataFrame in Memory:\n', np.round(sys.getsizeof(CAMPAIGN_SPEND_PD) / (1024.0**2), 2))
print('Size in MB of Snowpark DataFrame in Memory:\n', np.round(sys.getsizeof(CAMPAIGN_SPEND) / (1024.0**2), 2))

Size in MB of Pandas DataFrame in Memory:
 50.88
Size in MB of Snowpark DataFrame in Memory:
 0.0


In [196]:
session.sql("SELECT * FROM CAMPAIGN_SPEND LIMIT 10").show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

## FILTER

### Filter on a CAMPAIGN (UEFA)

In [197]:

CAMPAIGN_SPEND_UEFA = CAMPAIGN_SPEND.filter(col("CAMPAIGN") == "uefa")
CAMPAIGN_SPEND_UEFA.show()

------------------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------
|uefa        |video          |2017-09-30  |81              |701           |168           |
|uefa        |video          |2018-01-23  |73              |545           |141           |
|uefa        |social_media   |2016-09-28  |67              |467           |127           |
|uefa        |social_media   |2016-12-11  |93              |656           |161           |
|uefa        |social_media   |2016-06-08  |61              |503           |120           |
|uefa        |search_engine  |2017-08-25  |69              |592           |149           |
|uefa        |search_engine  |2017-01-27  |69              |512           |135           |
|uefa        |email          |2017-07-15  |76              |593           |134           |

### Filter on CAMPAIGN (UEFA) and CHANNEL (VIDEO)

In [198]:
#Filter on a channel
CAMPAIGN_SPEND_UEFA_VIDEO = CAMPAIGN_SPEND.filter((col("CAMPAIGN") == "uefa") & (col("channel")== "video"))
CAMPAIGN_SPEND_UEFA_VIDEO.show()

--------------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"  |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
--------------------------------------------------------------------------------------
|uefa        |video      |2017-09-30  |81              |701           |168           |
|uefa        |video      |2018-01-23  |73              |545           |141           |
|uefa        |video      |2012-06-15  |72              |566           |143           |
|uefa        |video      |2012-05-31  |70              |573           |146           |
|uefa        |video      |2012-10-23  |76              |584           |150           |
|uefa        |video      |2013-06-20  |68              |575           |138           |
|uefa        |video      |2016-03-14  |66              |612           |136           |
|uefa        |video      |2021-11-23  |82              |603           |159           |
|uefa        |video      |2018-01-22  |77  

In [199]:
session.sql("SELECT * FROM CAMPAIGN_SPEND WHERE CHANNEL = 'video' AND CAMPAIGN = 'uefa' LIMIT 10").show()

--------------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"  |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
--------------------------------------------------------------------------------------
|uefa        |video      |2017-09-30  |81              |701           |168           |
|uefa        |video      |2018-01-23  |73              |545           |141           |
|uefa        |video      |2012-06-15  |72              |566           |143           |
|uefa        |video      |2012-05-31  |70              |573           |146           |
|uefa        |video      |2012-10-23  |76              |584           |150           |
|uefa        |video      |2013-06-20  |68              |575           |138           |
|uefa        |video      |2016-03-14  |66              |612           |136           |
|uefa        |video      |2021-11-23  |82              |603           |159           |
|uefa        |video      |2018-01-22  |77  

In [200]:
#Filter on a column name
CAMPAIGN_SPEND_UEFA_VIDEO = CAMPAIGN_SPEND.filter(col("CHANNEL") == "video").filter(col("CAMPAIGN")== "uefa")
CAMPAIGN_SPEND_UEFA_VIDEO.show()

--------------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"  |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
--------------------------------------------------------------------------------------
|uefa        |video      |2017-09-30  |81              |701           |168           |
|uefa        |video      |2018-01-23  |73              |545           |141           |
|uefa        |video      |2012-06-15  |72              |566           |143           |
|uefa        |video      |2012-05-31  |70              |573           |146           |
|uefa        |video      |2012-10-23  |76              |584           |150           |
|uefa        |video      |2013-06-20  |68              |575           |138           |
|uefa        |video      |2016-03-14  |66              |612           |136           |
|uefa        |video      |2021-11-23  |82              |603           |159           |
|uefa        |video      |2018-01-22  |77  

## SORT

### Sort by DATE (Descending)

In [201]:
#Sort by Date
CAMPAIGN_SPEND_Sort = CAMPAIGN_SPEND_UEFA.sort(col("DATE").desc())
CAMPAIGN_SPEND_Sort.show()


------------------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------
|uefa        |social_media   |2022-05-12  |43              |354           |90            |
|uefa        |email          |2022-05-12  |53              |378           |86            |
|uefa        |search_engine  |2022-05-12  |45              |368           |92            |
|uefa        |video          |2022-05-12  |44              |339           |95            |
|uefa        |video          |2022-05-11  |65              |553           |146           |
|uefa        |search_engine  |2022-05-11  |68              |474           |132           |
|uefa        |email          |2022-05-11  |70              |529           |147           |
|uefa        |social_media   |2022-05-11  |62              |480           |128           |

## SELECT

### SELECT Columns to use in my dataframe

In [203]:
CAMPAIGN_SPEND_Select = CAMPAIGN_SPEND_Sort.select("DATE","CAMPAIGN","CHANNEL", "TOTAL_COST")
CAMPAIGN_SPEND_Select.show(10)

----------------------------------------------------------
|"DATE"      |"CAMPAIGN"  |"CHANNEL"      |"TOTAL_COST"  |
----------------------------------------------------------
|2022-05-12  |uefa        |search_engine  |368           |
|2022-05-12  |uefa        |social_media   |354           |
|2022-05-12  |uefa        |email          |378           |
|2022-05-12  |uefa        |video          |339           |
|2022-05-11  |uefa        |email          |529           |
|2022-05-11  |uefa        |social_media   |480           |
|2022-05-11  |uefa        |video          |553           |
|2022-05-11  |uefa        |search_engine  |474           |
|2022-05-10  |uefa        |social_media   |543           |
|2022-05-10  |uefa        |search_engine  |653           |
----------------------------------------------------------



### Spend per Year and Month 

In [204]:
#Select, Sort and do datepart
TOTAL_SPEND_PER_MONTH = CAMPAIGN_SPEND_UEFA\
    .select(year(col("DATE")).alias("YEAR"),month( col("DATE")).alias("MONTH"),"TOTAL_COST")\
    .group_by('YEAR', 'MONTH')\
    .agg(sum('TOTAL_COST').as_('TOTAL_COST'))
    
TOTAL_SPEND_PER_MONTH.show(10)

-----------------------------------
|"YEAR"  |"MONTH"  |"TOTAL_COST"  |
-----------------------------------
|2017    |9        |67975         |
|2018    |1        |69439         |
|2016    |9        |67095         |
|2016    |12       |68910         |
|2016    |6        |67837         |
|2017    |8        |69424         |
|2017    |1        |69430         |
|2017    |7        |69349         |
|2015    |11       |67815         |
|2015    |8        |68745         |
-----------------------------------



### Total spend per Year and Month per Channel 
##### SELECT, AGGREGATE, DATE PART (MONTH & YEAR), SORT

In [205]:
TOTAL_SPEND_PER_MONTH_PER_CHANNEL = CAMPAIGN_SPEND_UEFA.\
    select(year(col("DATE")).alias("YEAR"),month( col("DATE")).alias("MONTH"),"CHANNEL","TOTAL_COST")\
    .group_by('YEAR', 'MONTH','CHANNEL')\
    .agg(sum('TOTAL_COST').as_('TOTAL_COST'))\
    .sort("YEAR","MONTH")
    
TOTAL_SPEND_PER_MONTH_PER_CHANNEL.show(10)

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |social_media   |17224         |
|2012    |5        |search_engine  |17212         |
|2012    |5        |video          |17167         |
|2012    |5        |email          |17622         |
|2012    |6        |email          |17150         |
|2012    |6        |social_media   |16948         |
|2012    |6        |search_engine  |16936         |
|2012    |6        |video          |16629         |
|2012    |7        |video          |17394         |
|2012    |7        |email          |17565         |
---------------------------------------------------



### Pivot on Channel: Total Spend Across All Channels

In [206]:
TOTAL_SPEND_PER_MONTH = TOTAL_SPEND_PER_MONTH_PER_CHANNEL.pivot(
    'CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
TOTAL_SPEND_PER_MONTH_PER = TOTAL_SPEND_PER_MONTH_PER.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
TOTAL_SPEND_PER_MONTH.show()

-----------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"'search_engine'"  |"'social_media'"  |"'video'"  |"'email'"  |
-----------------------------------------------------------------------------------
|2012    |5        |17212              |17224             |17167      |17622      |
|2012    |6        |16936              |16948             |16629      |17150      |
|2012    |7        |17508              |17986             |17394      |17565      |
|2012    |8        |17551              |17197             |17725      |17594      |
|2012    |9        |16970              |16682             |17273      |17006      |
|2012    |10       |16997              |17460             |17337      |17624      |
|2012    |11       |16835              |16644             |16447      |17169      |
|2012    |12       |17764              |17720             |17061      |17985      |
|2013    |1        |17661              |17238             |17406      |17202

## Save Transformed Data into Snowflake Table

In [207]:
TOTAL_SPEND_PER_MONTH.write.mode("overwrite").save_as_table("SPEND_PER_MONTH")

## Create a View into a Snowflake Table

In [208]:
TOTAL_SPEND_PER_MONTH.createOrReplaceView("SPEND_PER_MONTH_view")

[Row(status='View SPEND_PER_MONTH_VIEW successfully created.')]

### End to end transformation

In [209]:
#End to end transformation
snow_df_spend_t = session.table('DEMODB.DE_SCHEMA.CAMPAIGN_SPEND')

    # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
  with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed("MONTH(DATE)","MONTH").sort('YEAR','MONTH')

# Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
  col("YEAR"),
  col("MONTH"),
  col("'search_engine'").as_("SEARCH_ENGINE"),
  col("'social_media'").as_("SOCIAL_MEDIA"),
  col("'video'").as_("VIDEO"),
  col("'email'").as_("EMAIL")
)
# Save transformed data
snow_df_spend_per_month_t.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
------------

### Export to stage
Write the table to a cvs file on a external stage.<br>
<a href="https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.DataFrameWriter.copy_into_location">Copy_into_location api

In [212]:
#Wriet to stage
export_result = TOTAL_SPEND_PER_MONTH.write.copy_into_location("@DE_EXPORT/campaign2.csv", file_format_type="csv", header=True, overwrite=True, )
export_result[0][0]

121

In [211]:
TOTAL_SPEND_PER_MONTH.count()

121

## Automate the process

### Create Python function

In [213]:
def campaign_spend_data_pipeline(session: Session) -> str:
    # DATA TRANSFORMATIONS
    # Perform the following actions to transform the data
    
    # Load the campaign spend data
    
    snow_df_spend_t = session.table('CAMPAIGN_SPEND')

    # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
    snow_df_spend_per_channel_t = snow_df_spend_t.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed("MONTH(DATE)","MONTH").sort('YEAR','MONTH')

    # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
    snow_df_spend_per_month_t = snow_df_spend_per_channel_t.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
    snow_df_spend_per_month_t = snow_df_spend_per_month_t.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
    )
    # Save transformed data
    snow_df_spend_per_month_t.write.mode('overwrite').save_as_table('SPEND_PER_MONTH')


### Register Stored Procedure

In [214]:
#Create store procedure

session.sproc.register(
  func=campaign_spend_data_pipeline,
  name='campaign_spend_data_pipeline',
  packages=['snowflake-snowpark-python'],
  is_permanent=True,
  stage_location="@DE_SPROC",
  replace=True)



<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fd9fa43d2b0>

### Create Task that uses the SP from above and schedule it to run every 60 minutes

In [215]:
#create Task
campaign_spend_data_pipeline_task = """
CREATE OR REPLACE TASK campaign_spend_data_pipeline_task
    WAREHOUSE = 'MYWH'
    SCHEDULE  = '60 MINUTE'
AS
    CALL campaign_spend_data_pipeline()
"""
session.sql(campaign_spend_data_pipeline_task).collect()


[Row(status='Task CAMPAIGN_SPEND_DATA_PIPELINE_TASK successfully created.')]

In [None]:
#Resume tasks
session.sql("alter task campaign_spend_data_pipeline_task resume").collect()

In [None]:
#Suspend tasks
session.sql("alter task campaign_spend_data_pipeline_task suspend").collect()

In [245]:
conn.close()