## Lab 2 - Vectorized UDFs for Batching

### Setup

Below are the imports needed for this lab.  They have been included in the instantiation of the conda environment.  

In [None]:
# Importing snowpark libraries
import snowflake.snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import avg, stddev, udf
from snowflake.snowpark.types import FloatType, StringType, PandasSeries

# Others
import json
import re
import pandas as pd
from datetime import datetime, timedelta

Connect to Snowflake by opening the `credentials.json` file and creating a session.

In [None]:
# Creating a session into Snowflake. This is the same as in previous sections of this lab
with open('credentials.json') as f:
    connection_parameters = json.load(f)
    
session = Session.builder.configs(connection_parameters).create()

# Testing the session
session.sql("SELECT current_warehouse(), current_database(), current_schema()").show()

Before continuing, we will disable caching to ensure proper performance comparisons.


In [None]:
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE").collect()

### Create Dataframes

As mentioned earlier, our data (TPCH dataset) is available from Snowflake via an inbound data share. We are taking 4 datasets into consideration, namely, Customers & Orders with 2 different sizes.

Let's create all of these datasets as dataframes.

In [None]:
df_customer_100 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer")
df_customer_1000 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer")
df_order_100 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.orders")
df_order_1000 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.orders")
df_customer_100.limit(2).show()
df_order_100.limit(2).show()

Next, let's check on the data volumes we will be working with.


In [None]:
print(f"df_customer_100 = {str(df_customer_100.count()/1000000)} M, df_customer_1000 = {str(df_customer_1000.count()/1000000)} M,\ndf_order_100 = {str(df_order_100.count()/1000000)} M, df_order_1000 = {str(df_order_1000.count()/1000000)} M")


### Setup Warehouses
As mentioned earlier, we will be using 2 different Warehouse Sizes:

For *_100 datasets, we will use Small and Medium Warehouse sizes
For *_1000 datasets, we will use Medium and Large Warehouse sizes

In [None]:
# Let's change the warehouse size as required
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()

# Let's check the current size of the warehouse
session.sql("SHOW WAREHOUSES LIKE '%COMPUTE_WH%'").collect()
session.sql('SELECT "name", "size" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))').show()

### Use Case 1: Numeric Computation
For this use case, we will make use of the Customer datasets. Let's first compute the mean and standard deviation for Account Balance in Customer datasets.

In [None]:
## df_customer_100
df_customer_100_mean = float(df_customer_100.agg(avg("C_ACCTBAL")).to_pandas().values[0])
df_customer_100_stddev = float(df_customer_100.agg(stddev("C_ACCTBAL")).to_pandas().values[0])

## df_customer_1000
df_customer_1000_mean = float(df_customer_1000.agg(avg("C_ACCTBAL")).to_pandas().values[0])
df_customer_1000_stddev = float(df_customer_1000.agg(stddev("C_ACCTBAL")).to_pandas().values[0])

Let's create 2 sets of UDFs, one for each dataset as the means & stdevs will be different

- As mentioned earlier, these are hypothetical usecases
- In this UDF, we take the Customer Account Balance as Input, subtract with the Mean, add the Standard Deviation, and finally multiple with 10,000 Notice how we are creating a Normal and Vectorised UDF both

In [None]:
## df_customer_100
def basic_compute_100(inp):
    return (inp - df_customer_100_mean + df_customer_100_stddev) * 10000.0

# Let's create a UDF on top of that
udf_bc_100 = udf(basic_compute_100, return_type=FloatType(), input_types=[FloatType()])

# Let's vectorise the UDF
@udf()
def vect_udf_bc_100(inp: PandasSeries[float]) -> PandasSeries[float]:
    return (inp - df_customer_100_mean + df_customer_100_stddev) * 10000.0

The above is an example of basic_compute_100 function created as a Temporary UDF within Snowflake. It takes a few seconds to create the UDF, the first time. Subsequent runs will take less time

- You'll notice the run time parameters such as `Python version=3.8, Cloudpickle==2.0.0` package, and so on
- More importantly towards the end, you can see the actual function definition

Using @udf with a Pandas Series or Dataframe as input will automatically make use of the Python UDF Batch API or Vectorised UDF. 
The UDF definition for a Vectorised UDF looks pretty much the same as for a normal non-vectorised one However, the key differences apart from our definition for `vect_udf_bc100` is:

- Importing the pandas library: this can be seen towards the end of the Vectorised UDF definition
- The Vectorised input argument: `compute._sf_vectorized_input = pandas.DataFrame`

In [None]:
## df_customer_1000
def basic_compute_1000(inp):
    return (inp - df_customer_1000_mean + df_customer_1000_stddev) * 10000.0

# Let's create a UDF on top of that
udf_bc_1000 = udf(basic_compute_1000, return_type=FloatType(), input_types=[FloatType()])

# Let's vectorise the UDF
@udf()
def vect_udf_bc_1000(inp: PandasSeries[float]) -> PandasSeries[float]:
    return (inp - df_customer_1000_mean + df_customer_1000_stddev) * 10000.0

Let's quickly execute these UDFs against our account balance datasets 100 & 1000. In our methodology we had discussed using different Virtual Warehouse sizes:

- Small and Medium for 100 dataset
- Medium and Large for 1000 dataset We will be re-running this accordingly

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
print("Use case 1.1: Numeric Computation with Small WH and 100x dataset")
print("Scalar UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(udf_bc_100("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()
print("Vectorized UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_bc_100("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 1.2: Numeric Computation with Medium WH and 100x dataset")
print("Scalar UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(udf_bc_100("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()
print("Vectorized UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_bc_100("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 1.3: Numeric Computation with Medium WH and 1000x dataset")
print("Scalar UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(udf_bc_1000("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()
print("Vectorized UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_bc_1000("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
print("Use case 1.4: Numeric Computation with Large WH and 1000x dataset")
print("Scalar UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(udf_bc_1000("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()
print("Vectorized UDF: ") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_bc_1000("C_ACCTBAL").alias("bal_from_mean")).agg(avg("bal_from_mean")).show()

### Use Case 2: String Manipulation
To test a workload based on string manipulation, let's create a single set of UDFs for Customer100 & Customer1000 datasets. Here, we take the Customer Name as input and split based on the ‘#' character. As before, we will create a Normal and Vectorised UDF

In [None]:
def str_manipulate(inp):
    return inp.split('#')[1]

# Let's create a UDF on top of that
udf_sm = udf(str_manipulate, return_type=StringType(), input_types=[StringType()])

# Let's vectorise the same UDF
@udf()
def vect_udf_sm(inp: PandasSeries[str]) -> PandasSeries[str]:
    return inp.str.split('#', expand=True)[1]

As before, let's quickly execute these UDFs against our datasets 100 & 1000. We will re-run these UDFs based on our Dataset and VW sizing

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
print("Use case 2.1: String Manipulation with Small WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 2.2: String Manipulation with Medium WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 2.3: String Manipulation with Medium WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(vect_udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
print("Use case 2.4: String Manipulation with Large WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(vect_udf_sm("C_NAME").alias("CustIDs")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.CustIDs")

### Use Case 3: Regex Masking
Another common use case is Regex Masking. To set this up, let's create a single set of UDFs for Customer100 & Customer1000 datasets. Here, we take the Customer Phone Number as input and masks the last 4 digits. As before, we will create a Normal and Vectorised UDF.

In [None]:
# Let's create a slightly complex function using native python regex for string replacement/masking
def mask_data(inp):
    return re.sub('\d{4}$', '****', inp)

# Let's create a UDF on top of that
udf_md = udf(mask_data, return_type=StringType(), input_types=[StringType()])

# Let's vectorise the same UDF
@udf()
def vect_udf_md(inp: PandasSeries[str]) -> PandasSeries[str]:
    return inp.apply(lambda x: mask_data(x))

As before, let's quickly execute these UDFs against our datasets 100 & 1000. We will re-run these UDFs based on our Dataset and VW sizing

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
print("Use case 3.1: Regex Masking with Small WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 3.2: Regex Masking with Medium WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 3.3: Regex Masking with Medium WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
print("Use case 3.4: Regex Masking with Large WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")

To try another approach, let's vectorise the original UDF again, this time using the Pandas replace code.  After creation, check the performance against the two datasets.


In [None]:
@udf()
def vect_udf_md(inp: PandasSeries[str]) -> PandasSeries[str]:
    return inp.replace(to_replace ='\d{4}$', value = '****', regex = True)

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 3.5: Regex Masking with Pandas replace, using Medium WH and 100x then 1000x dataset")
print("\rVectorized UDF on 100x: ", end="") 
%timeit -r 1 -n 1 df_customer_100.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")
print("\rVectorized UDF on 1000x: ", end="") 
%timeit -r 1 -n 1 df_customer_1000.select(vect_udf_md("C_PHONE").alias("masked_phone_nums")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.masked_phone_data")

### Timestamp Manipulation
The final example is changing a timestamp field. Let's create a single set of UDFs for Order100 & Order1000 datasets. Here, we take the Order Data as input and apply some basic Timestamp operations. As before, we will create a Normal and Vectorised UDF.

In [None]:
# Let's try some timestamp manipulation
def change_format(inp):
    instantiate = datetime.strptime(inp, '%Y-%m-%d')
    change_format = datetime.strptime(instantiate.strftime('%m/%d/%Y'), '%m/%d/%Y')
    dt_add3_days = change_format + timedelta(days=10)
    return dt_add3_days

# Let's create a UDF on top of that
udf_cf = udf(change_format, return_type=StringType(), input_types=[StringType()])

# Let's vectorise the UDF - still using native python
@udf()
def vect_udf_cf(inp: PandasSeries[str]) -> PandasSeries[str]:
    return inp.apply(lambda x: change_format(x))

As before, let's execute these UDFs against our datasets 100 & 1000. We will re-run these UDFs based on our Dataset and VW sizing.  

**NOTE:**  **This use case is the most computationally intensive and will take the longest compared to the previous use cases.**
The 1000 datasets take ~30 minutes to run each, so please be cognizant of credit usage and feel free to skip ahead to the analysis if desired. 

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
print("Use case 4.1: Timestamp Manipulation with Small WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_order_100.select(udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_order_100.select(vect_udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")


In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 4.2: Timestamp Manipulation with Medium WH and 100x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_order_100.select(udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_order_100.select(vect_udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
print("Use case 4.3: Timestamp Manipulation with Medium WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_order_1000.select(udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_order_1000.select(vect_udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
print("Use case 4.4: Timestamp Manipulation with Large WH and 1000x dataset")
print("\rScalar UDF: ", end="") 
%timeit -r 1 -n 1 df_order_1000.select(udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")
print("\rVectorized UDF: ", end="") 
%timeit -r 1 -n 1 df_order_1000.select(vect_udf_cf("O_ORDERDATE").alias("NY_OrderDate")).write.mode("overwrite").save_as_table("SNOWPARK_BEST_PRACTICES_LABS.PUBLIC.new_order_dates")

### Cleanup

The following code block cleans up what was executed in this lab, by accomplishing the following:
 - Resetting the warehouse to SMALL
 - Drop and recreate the database to quickly remove all tables created from this lab.  
 - Close the session.

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
session.sql("DROP DATABASE SNOWPARK_BEST_PRACTICES_LABS")
session.sql("CREATE DATABASE SNOWPARK_BEST_PRACTICES_LABS")

if session:
    session.close