## Python Version

* Checking Python Version

In [1]:
from platform import python_version
print(python_version())

3.8.16


## Working with Sessions - Multiple Ways

*  Section delineates connecting to a snowflake session
* Explains session connection through default and private key methdology

In [1]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
import configparser
import os

# Default Method
connection_parameters = {
    "account": "<SNOWFLAKE_ACCOUNT>",
    "user": "<SNOWFLAKE_USER>",
    "password": "<SNOWFLAKE_PASSWORD>",
    "role": "<SNOWFLAKE_ROLE>",
    "warehouse": "<SNOWFLAKE_WAREHOUSE>",  # optional
    "database": "<SNOWFLAKE_DATABASE>",  # optional
    "schema": "<SNOWFLAKE_SCHEMA>" # optional
  }


# Loading Credential From Config File
snowflake_credentials_file = '../snowflake_creds.config'
config = configparser.ConfigParser()
config.read(snowflake_credentials_file)
connection_parameters = dict(config['default'])


# Loading Credentials From Environmental Variables
connection_parameters = {
    "account": os.getenv('SNOWFLAKE_ACCOUNT'),
    "user": os.getenv('SNOWFLAKE_USER'),
    "password": os.getenv('SNOWFLAKE_PASSWORD'),
    "role": os.getenv('SNOWFLAKE_ROLE'),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),  # optional
    "database": os.getenv("SNOWFLAKE_DATABASE"),  # optional
    "schema": os.getenv("SNOWFLAKE_SCHEMA") # optional
  }


session = Session.builder.configs(connection_parameters).create()

### Key Pair Authentication (Optional)

* Explains session connection through Private key setup

In [None]:
## Import packages with which to parse the private key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

## Define our plain-text private key and passphrase
## which in reality would have been ingested from somewhere
private_key_plain_text = '''-----BEGIN PRIVATE KEY-----
< your private key >
-----END PRIVATE KEY-----'''

private_key_passphrase = '<your private key passphase>'

def load_private_key(private_key_plain_text, private_key_passphrase):
    ## Encode private key
    private_key_encoded = private_key_plain_text.encode()

    ## Encode private key passphrase
    private_key_passphrase_encoded = private_key_passphrase.encode()

    ## Load the private key, leveraging passphrase if needed
    private_key_loaded = serialization.load_pem_private_key(
        private_key_encoded,
        password = private_key_passphrase_encoded,
        backend = default_backend()
    )

    ## Serialize loaded private key
    private_key_serialized = private_key_loaded.private_bytes(
        encoding = serialization.Encoding.DER,
        format = serialization.PrivateFormat.PKCS8,
        encryption_algorithm = serialization.NoEncryption()
    )

    return private_key_serialized

## Define connection parameters
connection_parameters = {
    "account": "<your snowflake account identifier>",
    "user": "<your snowflake username>",
    "private_key": "<load_private_key(private_key_plain_text, private_key_passphrase)>",
    "warehouse": "<your snowflake warehouse>",  # optional
    "database": "<your snowflake database>",  # optional
    "schema": "<your snowflake schema>"  # optional
}

### Session Parameters

* Printsout Necessary Session Parameters

In [None]:
session.sql("CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()
session.sql("CREATE DATABASE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA").collect()
session.sql("CREATE STAGE IF NOT EXISTS SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA.MY_STAGE").collect()

In [4]:
print("Session Current Account:",session.get_current_account())
print("Session Current Database:",session.get_current_database())
print("Session Current Role:",session.get_current_role())
print("Session Current Schema:",session.get_current_schema())
print("Session Current Warehouse:",session.get_current_warehouse())
print("Session Current Fully Qualified Schema:",session.get_fully_qualified_current_schema())
print("Session query history:",session.query_history())
print("Session imports:",session.get_imports())
print("Session packages:",session.get_packages())
print("Session stage details:",session.get_session_stage())

Session Current Account: "ft66776"
Session Current Database: "SNOWPARK_DEFINITIVE_GUIDE"
Session Current Role: "ACCOUNTADMIN"
Session Current Schema: "MY_SCHEMA"
Session Current Warehouse: "COMPUTE_WH"
Session Current Fully Qualified Schema: "SNOWPARK_DEFINITIVE_GUIDE"."MY_SCHEMA"
Session query history: <snowflake.snowpark.query_history.QueryHistory object at 0x000001E33A756D30>
Session imports: []
Session packages: {}
Session stage details: @"SNOWPARK_DEFINITIVE_GUIDE"."MY_SCHEMA".SNOWPARK_TEMP_STAGE_TH1Z389Y2F


### Importing Custom Library using External Stage

* Delineates How Files Can Be Uploaded To Snowflake Stage

In [17]:
stage_info = session.sql("create or replace stage MY_STAGE")
stage_info.collect()

[Row(status='MY_STAGE already exists, statement succeeded.')]

In [20]:
put_result = session.file.put("./worksheets/worksheet_codes/last_name_finder_stage.py", "@MY_STAGE/")
put_result[0].status

'UPLOADED'

## Working with Dataframes

* This section explains  basic Dataframe I/O operations
* Covers Loading data into Table, read and write operations

### Creating Table

In [5]:
session.sql('CREATE OR REPLACE TABLE SAMPLE_EMPLOYEE_DATA (id INT,name VARCHAR, age INT, email VARCHAR, city VARCHAR,country VARCHAR)').collect()

[Row(status='Table SAMPLE_EMPLOYEE_DATA successfully created.')]

### Loading Data To Table 

In [6]:
session.sql("""
    INSERT INTO SAMPLE_EMPLOYEE_DATA VALUES
    (1,'John Doe',25,'johndoe@example.com','New York','USA'),
    (2, 'Jane Smith',30,'janesmith@example.com','Los Angeles','USA'),
    (3, 'Michael Johnson',35,'michaeljohnson@example.com','London','UK'),
    (4, 'Sarah Williams',28,'sarahwilliams@example.com','Leeds','UK'),
    (5,'David Brown',32,'davidbrown@example.com','Tokyo','Japan'),
    (6,'Emily Davis',29,'emilydavis@example.com','Sydney','Australia'),
    (7,'James Miller',27,'jamesmiller@example.com','Dallas','USA'),
    (8,'Emma Wilson',33,'emmawilson@example.com','Berlin','Germany'),
    (9,'Alexander Taylor',31,'alexandertaylor@example.com','Rome','Italy'),
    (10,'Olivia Anderson',26,'oliviaanderson@example.com','Melbourne','Australia')
    """).collect()

[Row(number of rows inserted=10)]

In [7]:
session.sql("SELECT count(*) FROM SAMPLE_EMPLOYEE_DATA").collect()

[Row(COUNT(*)=10)]

### Reading Data from Table

In [10]:
df_subset_row = session.table("SAMPLE_EMPLOYEE_DATA").filter(col("id") == 1) ##  Filters col ID = 1

In [13]:
df_subset_row.show()

------------------------------------------------------------------------
|"ID"  |"NAME"    |"AGE"  |"EMAIL"              |"CITY"    |"COUNTRY"  |
------------------------------------------------------------------------
|1     |John Doe  |25     |johndoe@example.com  |New York  |USA        |
------------------------------------------------------------------------



### Writing Data to Table

In [14]:

snowpark_df = session.write_pandas(df_subset_row.to_pandas(), "SAMPLE_EMPLOYEE_DATA_SUBSET", auto_create_table=True)
snowpark_df.to_pandas()

Unnamed: 0,ID,NAME,AGE,EMAIL,CITY,COUNTRY
0,1,John Doe,25,johndoe@example.com,New York,USA


## Working with UDF

* Section provides code templates for Snowpark UDF
* Includes UDF template, example and execution of UDF

### UDF Template

In [None]:
"""
##################################################################
## Define the function for the UDF
def <main Python function name>(<arguments>):
  return <function output>

# Imports required packages
from snowflake.snowpark.types import <specific Snowpark DataType object>

# Optional: Import additional packages or files
snowpark_session.add_packages('List of native packages in Anaconda Channel')
snowpark_session.add_import('Path to Local File')

##################################################################
## Register UDF in Snowflake
snowpark_session.udf.register(
    func = <Main Function Name>
  , return_type = <Return Type of Snowpark DataType object >
  , input_types = <[Input Types of Snowflake DataType object]>
  , is_permanent = True
  , name = '<UDF name>'
  , replace = True
  , stage_location = '@<UDF stage name>'
)

"""

In [116]:
##################################################################
## Define the function for the UDF

def last_name_finder(input_name:str):
  last_name = input_name.split()[1]
  return last_name

### Add packages and data types
from snowflake.snowpark.types import StringType,IntegerType,ArrayType


##################################################################
## Register UDF in Snowflake
test = session.udf.register(
    func = last_name_finder
  , return_type = StringType()
  , input_types = [StringType()]
  , is_permanent = True
  , name = 'LAST_NAME_FINDER'
  , replace = True
  , stage_location = '@MY_STAGE'
)


### Executing UDF

In [136]:

session.sql('''SELECT
    NAME
  , LAST_NAME_FINDER(NAME) AS LAST_NAME
FROM SAMPLE_EMPLOYEE_DATA
''').show()

----------------------------------
|"NAME"            |"LAST_NAME"  |
----------------------------------
|John Doe          |Doe          |
|Jane Smith        |Smith        |
|Michael Johnson   |Johnson      |
|Sarah Williams    |Williams     |
|David Brown       |Brown        |
|Emily Davis       |Davis        |
|James Miller      |Miller       |
|Emma Wilson       |Wilson       |
|Alexander Taylor  |Taylor       |
|Olivia Anderson   |Anderson     |
----------------------------------



## Working with UDTF

* Section provides code templates for Snowpark UDTF
* Includes UDTF template, example and execution of UDTF

### UDTF Template

In [None]:
"""

##################################################################
## Define the class for the UDTF
class <name of main Python class> :
  
  def __init__(self) :
    # Python code at the partition level
  
  def process(self, <arguments>) :
    # Python code at the row level
    yield (<col_1_val_1>, <col_2_val_1>, ...)
    yield (<col_1_val_2>, <col_2_val_2>, ...)

    # or

    return [
        (<col_1_val_1>, <col_2_val_1>, ...)
      , (<col_1_val_2>, <col_2_val_2>, ...)
    ]
  
  
  def end_partition(self) :
    # Python code at the partition level
 
    yield (<col_1_val_1>, <col_2_val_1>, ...)
    yield (<col_1_val_2>, <col_2_val_2>, ...)
    # or 
    return [
        (<col_1_val_1>, <col_2_val_1>, ...)
      , (<col_1_val_2>, <col_2_val_2>, ...)
    ]

# Import data types for defining the tabular output structure
from snowflake.snowpark.types import StructType, StructField

snowpark_session.add_packages('List of native packages in Anaconda Channel')
snowpark_session.add_import('Path to Local File')

##################################################################
## Register UDTF in Snowflake
snowpark_session.udtf.register(
    handler = <Python Class Name>
  , output_schema = StructType(<list of field name and Snowpark DataType objects - StructField objects>)
  , input_types = <[Input Types of Snowflake DataType object]>
  , is_permanent = True
  , name = '<UDTF name>'
  , replace = True
  , stage_location = '@<UDTF stage name>'
)

"""

In [158]:
##################################################################
## Define the class for the UDTF

# Define handler class
class CalculateAverage:
  def __init__(self) :
    self._values = []

  def process(self, input_measure: int) :
    self._values.append(input_measure)

  def end_partition(self) :
    values_list = self._values
    average = sum(values_list) / len(values_list)
    yield(average ,)

### Add packages and data types
from snowflake.snowpark.types import StructType, StructField
from snowflake.snowpark.types import FloatType,IntegerType,StringType

### Define output schema
output_schema = StructType([
      StructField("Avg_Age", FloatType())
  ])

##################################################################
## Register UDTF in Snowflake
session.udtf.register(
    handler = CalculateAverage
  , output_schema = output_schema
  , input_types = [IntegerType()]
  , is_permanent = True
  , name = 'AVERAGE_AGE'
  , replace = True
  , stage_location = '@MY_STAGE'
)

<snowflake.snowpark.udtf.UserDefinedTableFunction at 0x2bb83956a90>

### Executing UDTF

In [159]:

session.sql('''SELECT
COUNTRY,Avg_Age
FROM SAMPLE_EMPLOYEE_DATA
,table(AVERAGE_AGE(AGE) over (partition by COUNTRY))
''').show()

# select SAMPLE_EMPLOYEE_DATA.AGE
#   from SAMPLE_EMPLOYEE_DATA, table(SNOWPARK_UDF_TOTAL_SPEND(AGE) over (partition by COUNTRY));

----------------------------------
|"COUNTRY"  |"AVG_AGE"           |
----------------------------------
|Germany    |33.0                |
|Italy      |31.0                |
|UK         |31.5                |
|Australia  |27.5                |
|Japan      |32.0                |
|USA        |27.333333333333332  |
----------------------------------



## Working with Vectorized UDF

* Section provides code templates for Snowpark Vectorized UDF
* Includes vectorized UDF template, example and execution of vectorized UDF

In [None]:
import pandas as pd
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark.types import IntegerType, PandasSeriesType,StringType

@pandas_udf(
name='column_adder'
,stage_location = '@MY_STAGE'
,input_types=[PandasSeriesType(StringType()), PandasSeriesType(StringType())]
,return_type=PandasSeriesType(StringType())
,is_permanent=True
,replace=True)
def column_adder(column1: pd.Series, column2: pd.Series) -> pd.Series:
  return column1 + "," + column2

### Executing Vectorized UDF

In [5]:
df = session.table("SAMPLE_EMPLOYEE_DATA")
df.withColumn('City_Country', column_adder(col('CITY'), col('COUNTRY'))).show()

-----------------------------------------------------------------------------------------------------------------
|"ID"  |"NAME"            |"AGE"  |"EMAIL"                      |"CITY"       |"COUNTRY"  |"CITY_COUNTRY"       |
-----------------------------------------------------------------------------------------------------------------
|1     |John Doe          |25     |johndoe@example.com          |New York     |USA        |New York,USA         |
|2     |Jane Smith        |30     |janesmith@example.com        |Los Angeles  |USA        |Los Angeles,USA      |
|3     |Michael Johnson   |35     |michaeljohnson@example.com   |London       |UK         |London,UK            |
|4     |Sarah Williams    |28     |sarahwilliams@example.com    |Leeds        |UK         |Leeds,UK             |
|5     |David Brown       |32     |davidbrown@example.com       |Tokyo        |Japan      |Tokyo,Japan          |
|6     |Emily Davis       |29     |emilydavis@example.com       |Sydney       |Australia

## Working with Stored Procedure

* Section provides code templates for Snowpark Stored Procedure
* Includes Stored Procedure template, example and executio

### Stored Procedure Template

In [None]:
"""

##################################################################
## Define the function for the Stored Procedure
def <Python Function Name>(snowpark_session: snowflake.snowpark.Session, <arguments>):
  return <Output>

# Imports Required For Stored Procedure
from snowflake.snowpark.types import <specific Snowpark DataType object>

# Optional: Import additional packages or files
snowpark_session.add_packages('List of native packages in Anaconda Channel')
snowpark_session.add_import('Path to Local File')

##################################################################
## Register Stored Procedure in Snowflake
snowpark_session.sproc.register(
    func = <Function name to register>
  , return_type = <Return Type of Snowpark DataType object >
  , input_types = <[Input Types of Snowflake DataType object]>
  , is_permanent = True
  , name = '<Stored Procedure name>'
  , replace = True
  , stage_location = '@<Stored Procedure stage name>'
  <optional: , execute_as = 'CALLER'>
)

"""

In [178]:
##################################################################
## Define the function for the Stored Procedure

def subset_table(snowpark_session:Session):
  df =  snowpark_session.table('SAMPLE_EMPLOYEE_DATA').select("NAME","AGE")
  return df.collect()

##################################################################
## Register Stored Procedure in Snowflake

### Add packages and data types
from snowflake.snowpark.types import StringType
session.add_packages('snowflake-snowpark-python')

### Upload Stored Produre to Snowflake
session.sproc.register(
    func = subset_table
  , return_type = StringType()
  , input_types = []
  , is_permanent = True
  , name = 'SPROC_SUBSET_TABLE'
  , replace = True
  , stage_location = '@MY_STAGE'
)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x2bb84cebd30>

### Executing Stored Procedure

In [179]:
session.sql(''' CALL SPROC_SUBSET_TABLE()''').show()

------------------------------------------------------
|"SNOWPARK_SPROC_SUBSET_TABLE"                       |
------------------------------------------------------
|[Row(NAME='John Doe', AGE=25), Row(NAME='Jane S...  |
------------------------------------------------------



### Anonymous Stored Procedure

* Section provides code templates for Snowpark Anonymous Stored Procedure 
* Includes template, example and execution of Anonymous Strored Procedure

In [None]:
"""
WITH <name> AS PROCEDURE ( [ <arg_name> <arg_data_type> ] [ , ... ] )
  RETURNS { <result_data_type> [ [ NOT ] NULL ] | TABLE ( [ <col_name> <col_data_type> [ , ... ] ] ) }
  LANGUAGE PYTHON
  RUNTIME_VERSION = '<python_version>'
  PACKAGES = ( 'snowflake-snowpark-python[==<version>]'[, '<package_name>[==<version>]' ... ])
  [ IMPORTS = ( '<stage_path_and_file_name_to_read>' [, '<stage_path_and_file_name_to_read>' ...] ) ]
  HANDLER = '<function_name>'
  [ { CALLED ON NULL INPUT | { RETURNS NULL ON NULL INPUT | STRICT } } ]
  [ , <cte_nameN> [ ( <cte_column_list> ) ] AS ( SELECT ...  ) ]
  AS '<procedure_definition>'
CALL <name> ( [ [ <arg_name> => ] <arg> , ... ] )
  [ INTO :<snowflake_scripting_variable> ]

"""

In [None]:
"""

WITH proc AS PROCEDURE(table_name TEXT,country TEXT)
RETURNS TEXT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'main'
AS
$$

from snowflake.snowpark.functions import col
def main(session, table_name,country):
  return session.table(table_name).filter(col("country") == country).count()
$$

CALL proc('SAMPLE_EMPLOYEE_DATA','USA');

"""

# Cleanup Snowflake Objects

In [None]:
session.sql(""" CREATE OR REPLACE DATABASE SNOWPARK_DEFINITIVE_GUIDE""").collect()
session.sql(""" CREATE OR REPLACE SCHEMA SNOWPARK_DEFINITIVE_GUIDE.MY_SCHEMA""").collect()