Examining the performance benefits of the Cachetools library


In [None]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col,lit,call_udf,min,call_builtin,call_function,call_udf
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
import pandas as pd
import numpy as np
import json

with open('credentials.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()

print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

### Create the Pickle File
Run the following code to build the sample pickle file.

In [None]:
import pickle

day_dict = {1: 'monday', 2: 'tuesday', 3: 'wednesday', 4: 'thursday', 5: 'friday', 6: 'saturday', 7: 'sunday'}
print(day_dict)

with open('alldays.pkl', 'wb') as file:
    pickle.dump(day_dict, file) 

Next, let's test the pickle file output.

In [None]:
# Sample function to test loading pickle file
def getname():
    with open('alldays.pkl','rb') as fileopen:
        f1=pickle.load(fileopen)               
    return f1

def getday(key):
    dict1=getname()
    return dict1[key]

r=getday(3)
print(r)

### Creating named Stage and Uploading the Pickle File

Next, we will create a stage in Snowflake to upload the pickle file to.

In [None]:
session.sql("create or replace stage pythonstage").collect()
session.file.put("alldays.pkl", "pythonstage", auto_compress=False,overwrite=True)
session.sql("ls @pythonstage").collect()

With the pickle file successfully loaded, we are now ready to A/B test our UDF with and without Cachetools in the next steps.

### Creating Python UDF Without Cachetools

The block of code below creates a Python UDF without any use of the Cachetools library.

In [None]:
import sys
import os
import cachetools
from snowflake.snowpark.types import StringType,IntegerType

'''
We are importing the Pickle file in the session.

extract_name() -> deserializes the dictionary from the pickle and returns a dictionary object to the caller

getvalue() -> Takes a day number and return the name.

'''
session.add_import("@pythonstage/alldays.pkl")
def extract_name()->dict:
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    file_path = import_dir + "alldays.pkl"
    
    with open(file_path,'rb') as file:
        dict1=pickle.load(file)

    return dict1

def getvalue(key:int)->str:   
    filedict= extract_name()
    return filedict[key]
    
# Creating a Python UDF
udf_nocache = session.udf.register(
    func=getvalue,
    name="udf_nocachetools",
    stage_location='pythonstage',
    is_permanent=True,
    replace=True, 
    input_types=[IntegerType()],
    return_type=StringType()
)

### Creating Sample data
Next, we will create 2 million rows of sample data for the UDF to run against.

In [None]:
arr_random = np.random.randint(low=1, high=7, size=(2000000,3))
df = pd.DataFrame(arr_random, columns=['invoice_num','trx_num','weekday'])

df_transactions=session.createDataFrame(df,schema=['invoice_num','trx_num','weekday'])

df_transactions.count() # 2 Million records

### Call the UDF
Let's call the UDF and create a new table from the resultset with the below code.

In [None]:
from datetime import datetime

st=datetime.now()
df_transactions.withColumn('weekdayname',call_udf('udf_nocachetools',df_transactions['"weekday"'].astype('int')))\
.write.mode('overwrite').save_as_table("NoCacheTransactionTable")
et=datetime.now()
print(f"Total duration without using Cachetools library ={(et-st).total_seconds()}")

Lastly, confirm that the UDF ran and successfully created a new table as expected by running the following:


In [None]:
session.sql("select * from NoCacheTransactionTable limit 10").show()

### Creating Python User Defined Function With Cachetools

In the below cell, we will leverage the [Cachetools](https://pypi.org/project/cachetools/) library which will read the pickle file once and cache it.

In [None]:
import sys
import os
import cachetools
from snowflake.snowpark.types import StringType
import zipfile

'''
Using cachetools decorator while creating the function extract_name. Using this decorator the file will be read once and then cached. 
Other UDF execution will use the cached file and avoids reading the file from the storage.
'''

session.add_import("@pythonstage/alldays.pkl")

@cachetools.cached(cache={})
def extract_name()->dict:
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    file_path = import_dir + "alldays.pkl"
    
    with open(file_path,'rb') as file:
        dict1=pickle.load(file)

    return dict1


def getvalue(key:int)->str:   
    filedict= extract_name()
    return filedict[key]
    

session.add_packages("cachetools")
udf_cache = session.udf.register(
    func=getvalue,
    name="udf_withcachetools",
    stage_location='pythonstage',
    is_permanent=True, 
    replace=True, 
    input_types=[IntegerType()],
    return_type=StringType()
)

### Creating Sample data

Just like in the previous test, we will create 2 million rows of sample data for the UDF to run against.

In [None]:
arr_random = np.random.randint(low=1, high=7, size=(2000000,3))
df = pd.DataFrame(arr_random, columns=['invoice_num','trx_num','weekday'])

df_transactions=session.createDataFrame(df,schema=['invoice_num','trx_num','weekday'])

df_transactions.count() # 2 Million records

### Call the UDF

Let's call the UDF and create a new table from the resultset with the below code.

In [None]:
from datetime import datetime

st=datetime.now()
df_transactions.withColumn('"weekdayname"',call_udf('udf_withcachetools',df_transactions['"weekday"'].astype('int')))\
.write.mode('overwrite').save_as_table("CacheToolsTransactionTable")
et=datetime.now()
print(f"Total duration ={(et-st).total_seconds()}")

<b>Use cachetools library in UDFs and SPs, wherever you read the files and store them in the memory.</b>