# Data Transformation Using Snowpark for Python

The purpose of this script is to demonstrate simple data transformations on Snowflake objects using Snowpark for Python. The intent is to begin with a Snowflake table containing hourly website sales data spanning 15 years and perform the following transformations:

- Filter the data to 2017 onwards and only when the website is operating normally
- Categorise each hour based on the bounce rate for the website (i.e. the percentage of website viewers who did not put an item in the basket)
- Aggregate the number of sales by month and category
- Sort the data by category and month
- Store the result in a new table in Snowflake 

## Import the Snowpark package

Before we can begin, we must import the required package from the InterWorks Snowpark package and leverage it to create a Snowflake Snowpark Session object that is connected to our Snowflake environment. Alternatively, you can modify the code to establish a Snowflake Snowpark Session through any method of your choice.

In [21]:

## Import module to build snowpark 
from interworks_snowpark.snowpark_session_builder import build_snowpark_session_via_parameters_json as build_snowpark_session

## Generate Snowpark session
snowpark_session = build_snowpark_session()

## Retrieve source table from staging

Our source data is contained in the following object: `"SALES_DB"."STAGING"."WEBSITE_SALES"` and is read into a Snowflake dataframe object

In [2]:
df_source = snowpark_session.table('"SALES_DB"."STAGING"."WEBSITE_SALES"')

This can be quickly queried to confirm the contents

In [3]:
df_source.show()

--------------------------------------------------------------------
|"HOUR_OF_OPERATION"  |"SALES"  |"WEBSITE_STATUS"  |"BOUNCE_RATE"  |
--------------------------------------------------------------------
|2007-01-01 00:00:00  |4937.5   |STANDARD          |46             |
|2007-01-01 01:00:00  |4752.1   |STANDARD          |46             |
|2007-01-01 02:00:00  |4542.6   |STANDARD          |45             |
|2007-01-01 03:00:00  |4357.7   |STANDARD          |45             |
|2007-01-01 04:00:00  |4275.5   |STANDARD          |43             |
|2007-01-01 05:00:00  |4274.7   |STANDARD          |39             |
|2007-01-01 06:00:00  |4324.9   |STANDARD          |39             |
|2007-01-01 07:00:00  |4350.0   |STANDARD          |43             |
|2007-01-01 08:00:00  |4480.9   |STANDARD          |39             |
|2007-01-01 09:00:00  |4664.2   |STANDARD          |45             |
--------------------------------------------------------------------



## Transformations

We are now ready to begin our transformations. Before we can do so, we must import a few functions from the `snowflake.snowpark.functions` module. Most notably, we wish to import `col` as this allows us to target a specific field in a Snowflake dataframe. In addition, we wish to import the native [Snowflake DATE_TRUNC function](https://docs.snowflake.com/en/sql-reference/functions/date_trunc.html) so that we can reduce a timestamp to a year/month, and the native [Snowflake YEAR function](https://docs.snowflake.com/en/sql-reference/functions/year.html) so that we can quickly identify and filter by the year of a timestamp.

In [9]:
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import year
from snowflake.snowpark.functions import date_trunc
from snowflake.snowpark.functions import sum as sf_sum

### Filter the data to 2017 onwards and only when the website is operating normally

We leverage the `YEAR` function in Snowflake to determine the year for each record, then filter for when this is greater than or equal to 2017. We also apply a filter for when the "WEBSITE_STATUS" field has the value 'STANDARD'.

In [11]:
df_filtered = df_source.filter((year(col("HOUR_OF_OPERATION")) >= 2017) & (col("WEBSITE_STATUS") == 'STANDARD'))

df_filtered.show()

--------------------------------------------------------------------
|"HOUR_OF_OPERATION"  |"SALES"  |"WEBSITE_STATUS"  |"BOUNCE_RATE"  |
--------------------------------------------------------------------
|2017-01-01 00:00:00  |4937.5   |STANDARD          |46             |
|2017-01-01 01:00:00  |4752.1   |STANDARD          |46             |
|2017-01-01 02:00:00  |4542.6   |STANDARD          |45             |
|2017-01-01 03:00:00  |4357.7   |STANDARD          |45             |
|2017-01-01 04:00:00  |4275.5   |STANDARD          |43             |
|2017-01-01 05:00:00  |4274.7   |STANDARD          |39             |
|2017-01-01 06:00:00  |4324.9   |STANDARD          |39             |
|2017-01-01 07:00:00  |4350.0   |STANDARD          |43             |
|2017-01-01 08:00:00  |4480.9   |STANDARD          |39             |
|2017-01-01 09:00:00  |4664.2   |STANDARD          |45             |
--------------------------------------------------------------------



### Categorise each hour based on the bounce rate for the website (i.e. the percentage of website viewers who did not put an item in the basket)

We use simple buckets to categorise the bounce rate using the following rules:

- "LOW" when less than 30%
- "STANDARD" when between 30% and 50%
- "HIGH" when above 50%

Another way of thinking about this is as follows:

"LOW" < 30% <= "MEDIUM" < 50% <= "HIGH"

We will apply this calculation by defining and leveraging an anonymous User Defined Function (UDF), for which we must first import the `udf` function so that we can define UDFs, and the `IntegerType` and `StringType` data types so we can define the data types that our UDF will input and output.

In [12]:
# Import required objects
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.types import StringType

# Define UDF
def categorise_bounce_rate(bounce_rate: int) :
  if bounce_rate is None :
    return None
  elif bounce_rate < 30 :
    return 'LOW'
  elif bounce_rate < 50 :
    return 'MEDIUM'
  else :
    return 'HIGH'

# Create UDF
sf_categorise_bounce_rate = snowpark_session.udf.register(
    func = categorise_bounce_rate
  , return_type = StringType()
  , input_types = [IntegerType()]
  , is_permanent = False
  , name = '"SALES_DB"."STAGING"."CATEGORISE_BOUNCE_RATE"'
  , replace = True
)

In [13]:
df_categorised = df_filtered.select(col("HOUR_OF_OPERATION"), col("SALES"), sf_categorise_bounce_rate(col("BOUNCE_RATE")).alias("CATEGORY"))

df_categorised.show()

----------------------------------------------
|"HOUR_OF_OPERATION"  |"SALES"  |"CATEGORY"  |
----------------------------------------------
|2017-01-01 00:00:00  |4937.5   |MEDIUM      |
|2017-01-01 01:00:00  |4752.1   |MEDIUM      |
|2017-01-01 02:00:00  |4542.6   |MEDIUM      |
|2017-01-01 03:00:00  |4357.7   |MEDIUM      |
|2017-01-01 04:00:00  |4275.5   |MEDIUM      |
|2017-01-01 05:00:00  |4274.7   |MEDIUM      |
|2017-01-01 06:00:00  |4324.9   |MEDIUM      |
|2017-01-01 07:00:00  |4350.0   |MEDIUM      |
|2017-01-01 08:00:00  |4480.9   |MEDIUM      |
|2017-01-01 09:00:00  |4664.2   |MEDIUM      |
----------------------------------------------



### Aggregate the number of sales by month and category

Now that we have our categories, we are ready to group our data with the `groupby` method. Again, note how we leverage `sf_sum` to avoid using the standard Python `sum` function.

In [16]:
df_grouped = df_categorised \
  .group_by(date_trunc('month', col("HOUR_OF_OPERATION")), col("CATEGORY")) \
  .agg(sf_sum(col("SALES"))) \
  .select(col("DATE_TRUNC(MONTH, HOUR_OF_OPERATION)").alias("MONTH_OF_OPERATION"), col("CATEGORY"), col("SUM(SALES)").alias("SALES"))

df_grouped.show()

----------------------------------------------------------
|"MONTH_OF_OPERATION"  |"CATEGORY"  |"SALES"             |
----------------------------------------------------------
|2017-01-01 00:00:00   |LOW         |972043.5            |
|2017-03-01 00:00:00   |LOW         |57151.5             |
|2017-04-01 00:00:00   |MEDIUM      |1112902.9           |
|2017-10-01 00:00:00   |MEDIUM      |196111.8            |
|2018-04-01 00:00:00   |HIGH        |2340948.6           |
|2018-09-01 00:00:00   |MEDIUM      |10138.099999999999  |
|2019-02-01 00:00:00   |HIGH        |89087.7             |
|2019-04-01 00:00:00   |HIGH        |2023355.1           |
|2020-01-01 00:00:00   |LOW         |2060825.051         |
|2020-03-01 00:00:00   |MEDIUM      |3182368.799         |
----------------------------------------------------------



### Sort the data

Using the `sort()` method, we can simply sort the data by category and month.

In [18]:
df_sorted = df_grouped.sort(col("CATEGORY"), col("MONTH_OF_OPERATION"))

df_sorted.show()

-------------------------------------------------
|"MONTH_OF_OPERATION"  |"CATEGORY"  |"SALES"    |
-------------------------------------------------
|2017-01-01 00:00:00   |HIGH        |389788.9   |
|2017-02-01 00:00:00   |HIGH        |361717.2   |
|2017-03-01 00:00:00   |HIGH        |1956829.2  |
|2017-04-01 00:00:00   |HIGH        |2752738.1  |
|2017-05-01 00:00:00   |HIGH        |4338950.5  |
|2017-06-01 00:00:00   |HIGH        |4776703.4  |
|2017-07-01 00:00:00   |HIGH        |5880187.1  |
|2017-08-01 00:00:00   |HIGH        |5671478.3  |
|2017-09-01 00:00:00   |HIGH        |4602517.4  |
|2017-10-01 00:00:00   |HIGH        |3830905.8  |
-------------------------------------------------



### Store the result in a new table in Snowflake

Finally, we can output the data into a table in Snowflake.

In [19]:
df_sorted.write.mode("overwrite").save_as_table('"SALES_DB"."CLEAN"."WEBSITE_SALES"')

### Verify Results

We can connect directly to our new table in Snowflake to verify the results.

In [20]:
snowpark_session.table('"SALES_DB"."CLEAN"."WEBSITE_SALES"').show()

-------------------------------------------------
|"MONTH_OF_OPERATION"  |"CATEGORY"  |"SALES"    |
-------------------------------------------------
|2017-01-01 00:00:00   |HIGH        |389788.9   |
|2017-02-01 00:00:00   |HIGH        |361717.2   |
|2017-03-01 00:00:00   |HIGH        |1956829.2  |
|2017-04-01 00:00:00   |HIGH        |2752738.1  |
|2017-05-01 00:00:00   |HIGH        |4338950.5  |
|2017-06-01 00:00:00   |HIGH        |4776703.4  |
|2017-07-01 00:00:00   |HIGH        |5880187.1  |
|2017-08-01 00:00:00   |HIGH        |5671478.3  |
|2017-09-01 00:00:00   |HIGH        |4602517.4  |
|2017-10-01 00:00:00   |HIGH        |3830905.8  |
-------------------------------------------------

