Connect to Snowflake

In [1]:
import toml
import snowflake.connector
from snowflake.snowpark import Session

config = toml.load("../../.snowflake/connections.toml")
#print(config)
sf_config = config["connections"]["test_conn"]
#print("test",sf_config)

#sf_config = config["connections.test_conn"]

import snowflake.connector

conn = snowflake.connector.connect(
    user=sf_config["user"],
    password=sf_config["password"],
    account=sf_config["account"],
    warehouse=sf_config["warehouse"],
    database=sf_config["database"],
    schema=sf_config["schema"]
)

connection_parameters = {
        "account": sf_config["account"],
        "user": sf_config["user"],
        "password": sf_config["password"],
        "warehouse": sf_config["warehouse"],
        "database": sf_config["database"],
        "schema": sf_config["schema"]
    }
session = Session.builder.configs(connection_parameters).create()
#print("Session ")
#print("Snowpark Session Test:", session.get_current_schema())

try:
    cursor = conn.cursor()
    #cursor.execute("USE SCHEMA SNOWFLAKE_SAMPLE_DATA.TPCDS_SF100TCL;")
    #cursor.execute("SELECT * from CUSTOMER LIMIT 10")
    #df = cursor.fetchall()
    #print(df)
except Exception as e:
    print(f"this failed due to {e.args}, {e}")

session.close()


In [2]:

from snowflake.snowpark.functions import sproc, udf, udtf, call_udf, col, lit
from snowflake.snowpark.types import IntegerType, StructType, StructField
from snowflake.snowpark import Session
#from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
connection_parameters = {
        "account": sf_config["account"],
        "user": sf_config["user"],
        "password": sf_config["password"],
        "warehouse": sf_config["warehouse"],
        "database": sf_config["database"],
        "schema": sf_config["schema"]
    }
#print(connection_parameters)
session = None
if not session:
    session = Session.builder.configs(connection_parameters).create()

#below doesn;t work as it is in private preview
#session = Session.builder.configs(SnowflakeLoginOptions("test_conn")).create()
session.query_tag = "func-gen"

#print(session.get_current_database())
# Execute the query



Anonymous Stored Procedure (with lambda)

In [3]:
from snowflake.snowpark.functions import sproc
add_one = sproc(
  lambda session, x: session.sql(f"select {x} + 1").collect()[0][0],
  input_types=[IntegerType()], return_type=IntegerType(),
  packages=["snowflake-snowpark-python==1.13.0"])


ret = add_one(1)
print(f"add_one: {ret}")

add_one: 2


Named stored procedure (with lambda)

In [4]:
add_two = sproc(
  lambda session, x: session.sql(f"select {x} + 2").collect()[0][0],
  name="add_two_proc", replace=True,
  input_types=[IntegerType()], return_type=IntegerType(),
  packages=["snowflake-snowpark-python"])

ret = session.call("add_two_proc", 1)
print(f"add_two: {ret}")

add_two: 3


Registered stored procedure

In [5]:
@sproc(
  name="add_three", replace=True,
  is_permanent=True, stage_location="@int_stage",
  packages=["snowflake-snowpark-python"])
def add_three(session: Session, x: int) -> int:
  return session.sql(f"select {x} + 3").collect()[0][0]

# alternative
# session.sproc.register(
#   func=add_three, name="add_three", replace=True,
#   is_permanent=True, stage_location="@int_stage",
#   packages=["snowflake-snowpark-python"])

ret = session.sql("call add_three(1)").collect()[0][0]
print(f"add_three: {ret}")

SnowparkSQLException: (1304): 01b9f8f2-0000-cd9d-004c-e0030006100a: 002003 (02000): SQL compilation error:
Stage 'TASTY_BYTES_SAMPLE_DATA.PUBLIC.INT_STAGE' does not exist or not authorized.

Anonymous UDF (with lambda)

In [6]:
add_five = udf(lambda x: x+5,
  input_types=[IntegerType()], return_type=IntegerType())

df = session.create_dataframe([[1]]).to_df("a")
ret = df.select(add_five(col("a"))).collect()[0][0]
print(f"add_five: {ret}")

add_five: 6


Named UDF (with lambda)

In [7]:
add_six = udf(lambda x: x+6,
  name="add_six_proc", replace=True,
  input_types=[IntegerType()], return_type=IntegerType())

ret = session.sql("select add_six_proc(1)").collect()[0][0]
print(f"add_six: {ret}")

add_six: 7


Registered UDF

In [8]:
@udf(
    name="add_seven", replace=True,
    is_permanent=True, stage_location="@int_stage")
def add_seven(x: int) -> int:
    return x+7

df = session.create_dataframe([[1]], schema=["a"])
ret = df.select(call_udf("add_seven", col("a"))).collect()[0][0]
print(f"add_seven: {ret}")

SnowparkSQLException: (1304): 01b9f8c9-0000-cd95-004c-e0030005a0c2: 002003 (02000): SQL compilation error:
Stage 'TASTY_BYTES_SAMPLE_DATA.PUBLIC.INT_STAGE' does not exist or not authorized.

Registered UDTF

In [None]:
class GetTwo:
  def process(self, n):
    yield(1, )
    yield(n, )

get_two = udtf(GetTwo, 
  output_schema=StructType([StructField("number", IntegerType())]),
  input_types=[IntegerType()])

# SELECT * FROM ( TABLE ("TEST"."PUBLIC".SNOWPARK_TEMP_TABLE_FUNCTION_2PR5R5RI4E(3 :: INT) ))
ret = session.table_function(get_two(lit(3))).collect()
print(f"get_two: {ret}")