Connect to Snowflake

In [1]:
import pandas as pd
from snowflake.snowpark.functions import sproc, udf, udtf, call_udf, pandas_udf, col, lit
from snowflake.snowpark.types import IntegerType, StructType, StructField, PandasSeriesType
from snowflake.snowpark import Session
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions

session = Session.builder.configs(SnowflakeLoginOptions("test_conn")).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='TEST', CURRENT_SCHEMA()='PUBLIC')]


Anonymous Stored Procedure (with lambda)

In [2]:
add_one = sproc(
  lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
  input_types=[IntegerType()], return_type=IntegerType(),
  packages=["snowflake-snowpark-python"])

ret = add_one(1)
print(f"add_one: {ret}")

add_one: 2


Named stored procedure (with lambda)

In [3]:
add_two = sproc(
  lambda session, x: session.sql(f"select {x} + 2").collect()[0][0],
  name="add_two_proc", replace=True,
  input_types=[IntegerType()], return_type=IntegerType(),
  packages=["snowflake-snowpark-python"])

ret = session.call("add_two_proc", 1)
print(f"add_two: {ret}")

add_two: 3


Registered stored procedure

In [4]:
@sproc(
  name="add_three", replace=True,
  is_permanent=True, stage_location="@mystage",
  packages=["snowflake-snowpark-python"])
def add_three(session: Session, x: int) -> int:
  return session.sql(f"select {x} + 3").collect()[0][0]

# alternative
# session.sproc.register(
#   func=add_three, name="add_three", replace=True,
#   is_permanent=True, stage_location="@mystage",
#   packages=["snowflake-snowpark-python"])

ret = session.sql("call add_three(1)").collect()[0][0]
print(f"add_three: {ret}")

add_three: 4


Anonymous UDF (with lambda)

In [5]:
add_five = udf(lambda x: x+5,
  input_types=[IntegerType()], return_type=IntegerType())

df = session.create_dataframe([[1]]).to_df("a")
ret = df.select(add_five(col("a"))).collect()[0][0]
print(f"add_five: {ret}")

add_five: 6


Named UDF (with lambda)

In [6]:
add_six = udf(lambda x: x+6,
  name="add_six_proc", replace=True,
  input_types=[IntegerType()], return_type=IntegerType())

ret = session.sql("select add_six_proc(1)").collect()[0][0]
print(f"add_six: {ret}")

add_six: 7


Registered UDF

In [7]:
@udf(
    name="add_seven", replace=True,
    is_permanent=True, stage_location="@mystage")
def add_seven(x: int) -> int:
    return x+7

df = session.create_dataframe([[1]], schema=["a"])
ret = df.select(call_udf("add_seven", col("a"))).collect()[0][0]
print(f"add_seven: {ret}")

add_seven: 8


Registered UDTF

In [4]:
class GetTwo:
  def process(self, n):
    yield(1, )
    yield(n, )

get_two = udtf(GetTwo, 
  output_schema=StructType([StructField("number", IntegerType())]),
  input_types=[IntegerType()])

ret = session.table_function(get_two(lit(3))).collect()
print(f"get_two: {ret}")

get_two: [Row(NUMBER=1), Row(NUMBER=3)]


Vectorized UDFs

In [3]:
@pandas_udf(
  name="add_eight",
  input_types=[PandasSeriesType(IntegerType()), PandasSeriesType(IntegerType())],
  return_type=PandasSeriesType(IntegerType()))
def add_eight(col1: pd.Series, col2: pd.Series) -> pd.Series:
    return col1 + col2 + 8

df = session.create_dataframe([[1, 2]], schema=["a", "b"])
ret = df.select(call_udf("add_eight", col("a"), col("b"))).collect()[0][0]
print(f"add_eight: {ret}")

add_eight: 11
