
# Part 1 - Data Engineering



In [1]:
from snowflake.snowpark.session import Session
from snowflake.core import Root

from snowflake.snowpark.version import VERSION
import pandas as pd



## Make a connection to Snowflake via Snowpark

In [2]:
from config import connection_config 


session = Session.builder.configs(connection_config).create()
root= Root(session)
snowflake_environment = session.sql('select current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : ADMIN
Role                        : "ACCOUNTADMIN"
Database                    : None
Schema                      : None
Warehouse                   : None
Snowflake version           : 8.13.3
Snowpark for Python version : 1.13.0


### Challenge 1: Setup your environment with the Session and Root object

Lets create a new database, schema and warehouse for our work. 

The Snowpark session is no different than a regular Snowflake connection for this.
 

In [3]:
db_name = 'SNOWPARK_DB'
schema_name = 'HOL_SCHEMA'
wh_name = 'HOL_WH'

#Write code to create a Database, Schema , and Warehouse
session.sql(f'CREATE OR REPLACE DATABASE {db_name}').collect()

[Row(status='Database SNOWPARK_DB successfully created.')]

In [4]:
session.sql(f'CREATE OR REPLACE SCHEMA {schema_name}').collect()
session.sql(f'CREATE OR REPLACE WAREHOUSE {wh_name}').collect()
session.sql(f'USE DATABASE {db_name}').collect()
session.sql(f'USE SCHEMA {schema_name}').collect()
session.sql(f'USE WAREHOUSE {wh_name}').collect()

[Row(status='Statement executed successfully.')]

In [5]:
session.get_current_database()

'"SNOWPARK_DB"'

#### The Snowflake Core Python libary lets you do the same steps with no SQL and in an Object-oriented manner

[Docs: Snowflake Python API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview)

In [6]:
from snowflake.core.database import Database
from snowflake.core.schema import Schema 
from snowflake.core.warehouse import Warehouse

#Use the Python API to create the same objects as above
my_db = Database(name=db_name)
hol_db = root.databases.create(my_db, mode='orreplace')

In [7]:
#Create a Schema. Hint: Schemas fall under databases 
my_schema = Schema(schema_name)
my_schema = root.databases[db_name].schemas.create(my_schema)



In [8]:
session.get_current_warehouse()
session.use_warehouse(wh_name)

### Challenge 2: To connect Snowflake to external cloud storage, like S3, you need to create a Stage.

At the moment, **anything besides Databases, Schemas, Tables, Tasks, and Warehouse(compute pools) are SQL-only** until the Python API GA later this year.

[Create a Stage SQL](https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration#examples)

In [9]:

stage_info = {
    'campaign_data_stage': 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/campaign_spend/' ,
    'monthly_revenue_data_stage': 's3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/monthly_revenue/'
}


#Write a function below to create a stage using the Session objects SQL cursor()


In [11]:
#Run your function for all data sources


[Row(status='Stage area CAMPAIGN_DATA_STAGE successfully created.')]
[Row(status='Stage area MONTHLY_REVENUE_DATA_STAGE successfully created.')]


## Lets take a look at whats in the S3 bucket

In [12]:
#Use the list command to check out the files in your stage!
camp_stag = '@campaign_data_stage'
rev_stag = '@monthly_revenue_data_stage'
print(session.sql(f'list {camp_stag}').collect())
print(session.sql(f'list {rev_stag}').collect())

[Row(name='s3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/campaign_spend/campaign_spend.csv', size=13684943, md5='1d87f70421662a7666d3918b16b81daa', last_modified='Tue, 14 Feb 2023 21:11:11 GMT')]
[Row(name='s3://sfquickstarts/ad-spend-roi-snowpark-python-scikit-learn-streamlit/monthly_revenue/monthly_revenue.csv', size=2227, md5='64f1607c7838ef77b88b6a2b738aebab', last_modified='Tue, 14 Feb 2023 21:11:14 GMT')]


### Challenge 3: Time for our first Snowpark Dataframe! 

Snowpark is capabable of loading staged files to a dataframe using the DataFrameReader. This reader takes options similar to a SQL COPY command.

[Docs: Snowpark DataframeReader](https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/latest/api/snowflake.snowpark.DataFrameReader)

In [13]:
#Read the monthly revenue data to a DataFrame and show it

#revenue_data = 
#revenue_data.show()

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2012    |11       |3185894.64  |
|2012    |12       |3334570.96  |
|2013    |1        |3316455.44  |
|2013    |2        |2995042.21  |
---------------------------------



In [14]:
#read the campaign spend data to a DataFrame


#campaign_data.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

### Challenge 4: In reality, we need a Pivot Table to break out different marketing channels

In [56]:
#get your pivot list without hardcoding



SnowparkSQLException: (1304): 01b39044-0105-1033-004d-eb87008ead02: 000606 (57P03): No active warehouse selected in the current session.  Select an active warehouse with the 'use warehouse' command.


#### Lets leverage some handy snowpark functions to perform our aggregate and clean up column definitions post agg

[Docs: Snowpark Dataframe Agg](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/1.7.0/api/snowflake.snowpark.DataFrame.agg)

In [16]:
from snowflake.snowpark.functions import month,year,col, sum


#Do the agg
#spend_per_month =  campaign_data.group_by(


#Do the pivot

-----------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"'video'"  |"'search_engine'"  |"'social_media'"  |"'email'"  |
-----------------------------------------------------------------------------------
|2012    |5        |516729     |516431             |517618            |517208     |
|2012    |6        |501098     |506497             |504679            |501947     |
|2012    |7        |522762     |522780             |521395            |518405     |
|2012    |8        |520685     |519959             |520537            |521584     |
|2012    |9        |511364     |507211             |507404            |507363     |
|2012    |10       |522768     |518942             |520863            |519950     |
|2012    |11       |505292     |505715             |505221            |503748     |
|2012    |12       |521427     |520148             |520711            |520724     |
|2013    |1        |520583     |522151             |518635            |52116

In [17]:
#Use a quick select statement to clean up column names and join in the revenue_data

spend_per_channel.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
).\
    join(revenue_data,['YEAR','MONTH']).show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711          |521427   |520724   |3334570.96  |
|2013    |1        |5

### Final Part 1 Challenge: Automation: Run Campaign Spend Data Transformations As a Snowflake Task


In [21]:
from snowflake.snowpark.functions import sproc
session.use_warehouse(wh_name)
session.add_packages('snowflake-snowpark-python')

def campaign_spend_data_pipeline(session: Session) -> bool:
    
    #Load data from stage
    campaign_spend_data = session.read.options(
        {'file_format':'csv',
        'infer_schema':True,
        'parse_header':True}).csv('@campaign_data_stage')


    revenue_data = session.read.options(
        {'file_format':'csv',
        'infer_schema':True,
        'parse_header':True}).csv('@monthly_revenue_data_stage')
    
    # Dynamically get list of channels to pivot
    channel_pivot_list = list(campaign_spend_data.select('channel').distinct().to_pandas().CHANNEL)

    #Get Aggregate
    spend_per_month = campaign_spend_data.group_by(
        year('DATE'),
        month('DATE'),
        'CHANNEL').agg(
                    sum('TOTAL_COST').as_('TOTAL_COST')).\
                                        with_column_renamed('"YEAR(DATE)"',"YEAR").\
                                        with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')


    spend_per_channel = spend_per_month.pivot('CHANNEL', channel_pivot_list).sum('TOTAL_COST').sort('YEAR','MONTH').\
                                        select(
                                            col("YEAR"),
                                            col("MONTH"),
                                            col("'search_engine'").as_("SEARCH_ENGINE"),
                                            col("'social_media'").as_("SOCIAL_MEDIA"),
                                            col("'video'").as_("VIDEO"),
                                            col("'email'").as_("EMAIL")
                                        )


    
    spend_and_revenue = spend_per_channel.join(revenue_data, ['YEAR' ,'MONTH'])
    #write to snowflake
    

    return True

[Docs: Python API - Tasks](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-managing-tasks) 


In [22]:
session.use_database(db_name)
session.use_schema(schema_name)
session.sproc.register(campaign_spend_data_pipeline, name='campaign_spend_pipeline', replace = True)
session.sql("create or replace task my_task as call campaign_spend_pipeline();").collect()

[Row(status='Task MY_TASK successfully created.')]

In [23]:
# Create  Stored Procedure and Task from my python function

from snowflake.core.task import Task, StoredProcedureCall, Cron
from datetime import timedelta

#create a repo called snowpark_stage
session.sql('create or replace stage snowpark_stage').collect()

#Define your Task 
run_campaign_task_d =  Task('campaign_spend_pipeline', 
                          StoredProcedureCall(campaign_spend_data_pipeline, 
                                              stage_location='@snowpark_stage',
                                              packages=["snowflake-snowpark-python"]
                                             ) ,
                           schedule= Cron('0 0 2 * *', timezone = 'UTC'),
                           warehouse= wh_name
                                             )


In [24]:
#use root to create the task. HINT: Tasks are schema level objects
t= root.databases[db_name].schemas[schema_name].tasks.create(run_campaign_task_d, mode='orreplace')


#### Final Checkpoint for Part 1. Do you have a clean table ready for ML training that looks like the below?

In [35]:
snow_df_spend_and_revenue_per_month = session.table('SPEND_AND_REVENUE_PER_MONTH')
snow_df_spend_and_revenue_per_month.show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711          |521427   |520724   |3334570.96  |
|2013    |1        |5

In [31]:
t.execute()

# Part 2 - Data Science and MLOps with Snowpark ML


In [39]:
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import PolynomialFeatures, StandardScaler
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import Registry



### Preprocess your table to get to a Feature table.

##### Quick Cleaning

In [36]:
snow_df_spend_and_revenue_per_month = session.table('spend_and_revenue_per_month')

# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')

In [38]:
session.table("MARKETING_BUDGETS_FEATURES").show()

---------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
---------------------------------------------------------------------
|516431           |517618          |516729   |517208   |3264300.11  |
|506497           |504679          |501098   |501947   |3208482.33  |
|522780           |521395          |522762   |518405   |3311966.98  |
|519959           |520537          |520685   |521584   |3311752.81  |
|507211           |507404          |511364   |507363   |3208563.06  |
|518942           |520863          |522768   |519950   |3334028.46  |
|505715           |505221          |505292   |503748   |3185894.64  |
|520148           |520711          |521427   |520724   |3334570.96  |
|522151           |518635          |520583   |521167   |3316455.44  |
|467736           |474679          |469856   |469784   |2995042.21  |
---------------------------------------------------------------------



##### Feature Preprocessing
[Docs: Snowpark ML - Preprocessing](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-modeling)

In [None]:
CROSS_VALIDATION_FOLDS = 10
POLYNOMIAL_FEATURES_DEGREE = 2
root.warehouses.create(Warehouse('TRAINING_WH',size='MEDIUM'), mode='orreplace')
# Create train and test Snowpark DataDrames
train_df, test_df = session.table("MARKETING_BUDGETS_FEATURES").random_split(weights=[0.8, 0.2], seed=0)

# Preprocess the Numeric columns
# We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
# NOTE: High degrees can cause overfitting.
numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = POLYNOMIAL_FEATURES_DEGREE)),('scaler', StandardScaler())])

# Combine the preprocessed step together using the Column Transformer module
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)])

# The next step is the integrate d  the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
parameteres = {}

# Use GridSearch to find the best fitting model based on number_of_folds folds
model = GridSearchCV(
    estimator=pipeline,
    param_grid=parameteres,
    cv=CROSS_VALIDATION_FOLDS,
    label_cols=["REVENUE"],
    output_cols=["PREDICTED_REVENUE"],
    verbose=2
)

# Fit and Score
model.fit(train_df)
train_r2_score = model.score(train_df)
test_r2_score = model.score(test_df)

# R2 score on train and test datasets
print(f"R2 score on Train : {train_r2_score}")
print(f"R2 score on Test  : {test_r2_score}")

#### Challenge 2: Model Training and MLOps

##### Use the Model Registry

[Docs: Snowpark ML Model Registry](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-mlops-model-registry#opening-the-snowpark-model-registry)


In [45]:
# Dont forget your import 
registry =  Registry(session)
# create the Registry
MODEL_NAME = "PREDICT_ROI"
#Log the model and create a reference. 



  return next(self.gen)


#### Create fake data to test your model against


In [50]:
session.sql("""CREATE or REPLACE TABLE BUDGET_ALLOCATIONS_AND_ROI (
  MONTH varchar(30),
  SEARCH_ENGINE integer,
  SOCIAL_MEDIA integer,
  VIDEO integer,
  EMAIL integer,
  ROI float);""").collect()

session.sql("""
INSERT INTO BUDGET_ALLOCATIONS_AND_ROI (MONTH, SEARCH_ENGINE, SOCIAL_MEDIA, VIDEO, EMAIL, ROI)
VALUES
('January',35,50,35,85,8.22),
('February',75,50,35,85,13.90),
('March',15,50,35,15,7.34),
('April',25,80,40,90,13.23),
('May',95,95,10,95,6.246),
('June',35,50,35,85,8.22);""").collect()

budgets = session.table('BUDGET_ALLOCATIONS_AND_ROI')
budgets.show()

---------------------------------------------------------------------------
|"MONTH"   |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"ROI"  |
---------------------------------------------------------------------------
|January   |35               |50              |35       |85       |8.22   |
|February  |75               |50              |35       |85       |13.9   |
|March     |15               |50              |35       |15       |7.34   |
|April     |25               |80              |40       |90       |13.23  |
|May       |95               |95              |10       |95       |6.246  |
|June      |35               |50              |35       |85       |8.22   |
---------------------------------------------------------------------------



In [53]:
#Run the inference on your mock data


-------------------------------------------------------------------------------------------------
|"MONTH"   |"ROI"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_REVENUE"  |
-------------------------------------------------------------------------------------------------
|January   |8.22   |35               |50              |35       |85       |-210886.4949279744   |
|February  |13.9   |75               |50              |35       |85       |-210806.24371288903  |
|March     |7.34   |15               |50              |35       |15       |-211751.07379658613  |
|April     |13.23  |25               |80              |40       |90       |-210132.100637245    |
|May       |6.246  |95               |95              |10       |95       |-208393.69002735848  |
|June      |8.22   |35               |50              |35       |85       |-210886.4949279744   |
-------------------------------------------------------------------------------------------------



## Part 3: Quickly build a data app with Streamlit

In [54]:
#### Copy Paste the below into SiS


In [None]:
import calendar 
import altair as alt
import streamlit as st
import pandas as pd
from snowflake.snowpark.functions import col
from snowflake.ml.registry import Registry
from snowflake.snowpark.context import get_active_session


#Create a header
st.header("SkiGear Co Ad Spend Optimizer")
forcast_month = 'July'
#Get your session
session = get_active_session()

#Get an unpivoted view of your Budget without the Forcast month

channels_upper = ['SEARCH_ENGINE', 'SOCIAL_MEDIA', 'VIDEO', 'EMAIL']

data = session.table('SNOWPARK_DB.HOL_SCHEMA.BUDGET_ALLOCATIONS_AND_ROI').\
                unpivot("Budget", "Channel",
                       channels_upper).filter(col("MONTH") != forcast_month)


last_allocation = data.filter(col("MONTH") == "June").to_pandas()
previous_rois = data.drop(["CHANNEL", "BUDGET"]).distinct().to_pandas()


#st.dataframe(data)
st.subheader("Expected Advertising budgets")

#Create a set of slides in columns
col1, _, col2 = st.columns([4, 1, 4])
budgets = []
for alloc, col in zip(last_allocation.itertuples(), [col1, col1, col2, col2]):
  budgets.append(col.slider(alloc.CHANNEL, 0, 100, alloc.BUDGET, 5))

budgets_df = pd.DataFrame(budgets).T

#st.dataframe(budgets_df)

#get latest version of my model
ml_registry = Registry(session , database_name='SNOWPARK_DB', schema_name='HOL_SCHEMA')

ROI_MODEL = ml_registry.get_model(model_name= "PREDICT_ROI").versions()[-1]

def predict(budgets_from_widget):
    budgets_for_inf = budgets_from_widget *10000
    
    pred = ROI_MODEL.run(budgets_for_inf,function_name='predict')
    pred = pred["PREDICTED_REVENUE"].values[0] / 1000000
    change = round(((pred / previous_rois["ROI"].iloc[-1]) - 1) * 100, 1)
    return pred, change


#st.dataframe(predict(budgets_df))
pred, change = predict(budgets_df)
st.metric("", f"Predicted revenue ${pred:.2f} million", f"{change:.1f} % vs last month")
july = pd.DataFrame({"MONTH": ["July"]*4, "CHANNEL": channels_upper, "BUDGET": budgets, "ROI": [pred]*4})

def chart(chart_data):
  base = alt.Chart(chart_data).encode(alt.X("MONTH", sort=list(calendar.month_name), title=None))
  bars = base.mark_bar().encode(y=alt.Y("BUDGET", title="Budget", scale=alt.Scale(domain=[0, 300])), color=alt.Color("CHANNEL", legend=alt.Legend(orient="top", title=" ")), opacity=alt.condition(alt.datum.MONTH=="July", alt.value(1), alt.value(0.3)))
  lines = base.mark_line(size=3).encode(y=alt.Y("ROI", title="Revenue", scale=alt.Scale(domain=[0, 25])), color=alt.value("#808495"))
  points = base.mark_point(strokeWidth=3).encode(y=alt.Y("ROI"), stroke=alt.value("#808495"), fill=alt.value("white"), size=alt.condition(alt.datum.MONTH=="July", alt.value(300), alt.value(70)))
  chart = alt.layer(bars, lines + points).resolve_scale(y="independent").configure_view(strokeWidth=0).configure_axisY(domain=False).configure_axis(labelColor="#808495", tickColor="#e6eaf1", gridColor="#e6eaf1", domainColor="#e6eaf1", titleFontWeight=600, titlePadding=10, labelPadding=5, labelFontSize=14).configure_range(category=["#FFE08E", "#03C0F2", "#FFAAAB", "#995EFF"])
  st.altair_chart(chart, use_container_width=True)

#st.dataframe(july)

chart(pd.concat([data.to_pandas(), july]).reset_index(drop=True))
