# Data Engineering Pipelines with Snowpark Python


"Data engineers are focused primarily on building and maintaining data pipelines that transport data through different steps and put it into a usable state ... The data engineering process encompasses the overall effort required to create **data pipelines** that automate the transfer of data from place to place and transform that data into a specific format for a certain type of analysis. In that sense, data engineering isn’t something you do once. It’s an ongoing practice that involves collecting, preparing, transforming, and delivering data. A data pipeline helps automate these tasks so they can be reliably repeated. It’s a practice more than a specific technology." (From Cloud Data Engineering for Dummies, Snowflake Special Edition)

Are you interested in unleashing the power of Snowpark Python to build data engineering pipelines? Well then, this Quickstart is for you! The focus here will be on building data engineering pipelines with Python, and not on data science. For examples of doing data science with Snowpark Python please check out our [Machine Learning with Snowpark Python: - Credit Card Approval Prediction](https://quickstarts.snowflake.com/guide/getting_started_snowpark_machine_learning/index.html?index=..%2F..index#0) Quickstart.

[This Quickstart](https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_python/index.html) will cover a lot of ground, and by the end you will have built a robust data engineering pipeline using Snowpark Python stored procedures. That pipeline will process data incrementally, be orchestrated with Snowflake tasks, and be deployed via a CI/CD pipeline. You'll also learn how to use Snowflake's new developer CLI tool and Visual Studio Code extension! Here's a quick visual overview:

In [None]:
import streamlit as st
st.image("https://raw.githubusercontent.com/Snowflake-Labs/sfguide-data-engineering-with-snowpark-python/main/images/demo_overview.png",width=800)

### Prerequisites
* Familiarity with Python
* Familiarity with the DataFrame API
* Familiarity with Snowflake

### What You’ll Learn
You will learn about the following Snowflake features during this Quickstart:

* Snowflake's Table Format
* Data ingestion with COPY
* Schema inference
* Data sharing/marketplace (instead of ETL)
* Streams for incremental processing (CDC)
* Streams on views
* Python UDFs (with third-party packages)
* Python Stored Procedures
* Snowpark DataFrame API
* Snowpark Python programmability
* Warehouse elasticity (dynamic scaling)
* SnowCLI (PuPr)
* Tasks (with Stream triggers)
* Task Observability

### What You’ll Need
You will need the following things before beginning:

* Snowflake account
    * **A Snowflake Account**
    * **A Snowflake user created with ACCOUNTADMIN permissions**. This user will be used to get things setup in Snowflake.
    * **Anaconda Terms & Conditions accepted**. See Getting Started section in [Third-Party Packages](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#getting-started).

### What You’ll Build
During this Quickstart you will accomplish the following things:

* Load Parquet data to Snowflake using schema inference
* Setup access to Snowflake Marketplace data
* Create a Python UDF to convert temperature
* Create a data engineering pipeline with Python stored procedures to incrementally process data
* Orchestrate the pipelines with tasks
* Monitor the pipelines with Snowsight
* Deploy the Snowpark Python stored procedures via a CI/CD pipeline


## Setup Snowflake

You can run SQL queries against Snowflake in many different ways (through the Snowsight UI, SnowSQL, etc.) but for this Quickstart we'll be using the Snowflake Notebooks.

### Run the Script
To set up all the objects we'll need in Snowflake for this Quickstart you'll need to run following steps.

In [None]:
-- ----------------------------------------------------------------------------
-- Step #1: Accept Anaconda Terms & Conditions
-- ----------------------------------------------------------------------------

-- See Getting Started section in Third-Party Packages (https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#getting-started)


-- ----------------------------------------------------------------------------
-- Step #2: Create the account level objects
-- ----------------------------------------------------------------------------
USE ROLE ACCOUNTADMIN;

-- Roles
SET MY_USER = CURRENT_USER(); 
CREATE OR REPLACE ROLE HOL_ROLE;
GRANT ROLE HOL_ROLE TO ROLE SYSADMIN;
GRANT ROLE HOL_ROLE TO USER IDENTIFIER($MY_USER);

CREATE STAGE IF NOT EXISTS SCRIPTS 
	DIRECTORY = ( ENABLE = true );

GRANT EXECUTE TASK ON ACCOUNT TO ROLE HOL_ROLE;
GRANT MONITOR EXECUTION ON ACCOUNT TO ROLE HOL_ROLE;
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE HOL_ROLE;

-- Databases
CREATE OR REPLACE DATABASE NB_HOL_DB;
GRANT OWNERSHIP ON DATABASE NB_HOL_DB TO ROLE HOL_ROLE;

-- Warehouses
CREATE OR REPLACE WAREHOUSE HOL_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
GRANT OWNERSHIP ON WAREHOUSE HOL_WH TO ROLE HOL_ROLE;

In [None]:
-- ----------------------------------------------------------------------------
-- Step #3: Create the database level objects
-- ----------------------------------------------------------------------------
USE ROLE HOL_ROLE;
USE WAREHOUSE HOL_WH;
USE DATABASE NB_HOL_DB;

-- Schemas
CREATE OR REPLACE SCHEMA EXTERNAL;
CREATE OR REPLACE SCHEMA RAW_POS;
CREATE OR REPLACE SCHEMA RAW_CUSTOMER;
CREATE OR REPLACE SCHEMA HARMONIZED;
CREATE OR REPLACE SCHEMA ANALYTICS;

-- External Frostbyte objects
USE SCHEMA EXTERNAL;
CREATE OR REPLACE FILE FORMAT PARQUET_FORMAT
    TYPE = PARQUET
    COMPRESSION = SNAPPY;
    
CREATE OR REPLACE STAGE FROSTBYTE_RAW_STAGE
    URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/';

-- ANALYTICS objects
USE SCHEMA ANALYTICS;

CREATE OR REPLACE FUNCTION ANALYTICS.INCH_TO_MILLIMETER_UDF(INCH NUMBER(35,4))
RETURNS NUMBER(35,4)
    AS
$$
    inch * 25.4
$$;

### Load Raw 

During this step we will be loading the raw Tasty Bytes POS and Customer loyalty data from raw Parquet files in `s3://sfquickstarts/data-engineering-with-snowpark-python/` to our `RAW_POS` and `RAW_CUSTOMER` schemas in Snowflake. And you are going to be orchestrating this process from your laptop in Python using the Snowpark Python API. To put this in context, we are on step **#2** in our data flow overview:


In [None]:
import streamlit as st
st.image("https://raw.githubusercontent.com/Snowflake-Labs/sfguide-data-engineering-with-snowpark-python/main/images/demo_overview.png",width=800)

To load the raw data, execute the following:

In [None]:
import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
#import snowflake.snowpark.functions as F


POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
CUSTOMER_TABLES = ['customer_loyalty']
TABLE_DICT = {
    "pos": {"schema": "RAW_POS", "tables": POS_TABLES},
    "customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES}
}

# SNOWFLAKE ADVANTAGE: Schema detection
# SNOWFLAKE ADVANTAGE: Data ingestion with COPY
# SNOWFLAKE ADVANTAGE: Snowflake Tables (not file-based)

def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
    session.use_schema(schema)
    if year is None:
        location = "@external.frostbyte_raw_stage/{}/{}".format(s3dir, tname)
    else:
        print('\tLoading year {}'.format(year)) 
        location = "@external.frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year)
    
    # we can infer schema using the parquet read option
    df = session.read.option("compression", "snappy") \
                            .parquet(location)
    df.copy_into_table("{}".format(tname))

# SNOWFLAKE ADVANTAGE: Warehouse elasticity (dynamic scaling)

def load_all_raw_tables(session):
    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

    for s3dir, data in TABLE_DICT.items():
        tnames = data['tables']
        schema = data['schema']
        for tname in tnames:
            print("Loading {}".format(tname))
            # Only load the first 3 years of data for the order tables at this point
            # We will load the 2022 data later in the lab
            if tname in ['order_header', 'order_detail']:
                # For testing, use only records from 2019. Uncomment to add more years of data.
                for year in ['2019']:#, '2020', '2021']: 
                    load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
            else:
                load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)

    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()

def validate_raw_tables(session):
    # check column names from the inferred schema
    for tname in POS_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_POS.{}'.format(tname)).columns))

    for tname in CUSTOMER_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_CUSTOMER.{}'.format(tname)).columns))

In [None]:
# Add the utils package to our path and import the snowpark_utils function
import os, sys
current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
load_all_raw_tables(session)

In [None]:
validate_raw_tables(session)

### Viewing What Happened in Snowflake
The [Query History](https://docs.snowflake.com/en/user-guide/ui-snowsight-activity.html#query-history) in Snowflake is a very power feature, that logs every query run against your Snowflake account, no matter which tool or process initiated it. And this is especially helpful when working with client tools and APIs.

The Python script you just ran did a small amount of work locally, basically just orchestrating the process by looping through each table and issuing the command to Snowflake to load the data. But all of the heavy lifting ran inside Snowflake! This push-down is a hallmark of the Snowpark API and allows you to leverage the scalability and compute power of Snowflake!

Log in to your Snowflake account and take a quick look at the SQL that was generated by the Snowpark API. This will help you better understand what the API is doing and will help you debug any issues you may run into.


### Schema Inference
One very helpful feature in Snowflake is the ability to infer the schema of files in a stage that you wish to work with. This is accomplished in SQL with the [`INFER_SCHEMA()`](https://docs.snowflake.com/en/sql-reference/functions/infer_schema.html) function. The Snowpark Python API does this for you automatically when you call the `session.read()` method. Here is the code snippet:

```python
    # we can infer schema using the parquet read option
    df = session.read.option("compression", "snappy") \
                            .parquet(location)
```

### Data Ingestion with COPY
In order to load the data into a Snowflake table we will use the `copy_into_table()` method on a DataFrame. This method will create the target table in Snowflake using the inferred schema (if it doesn't exist), and then call the highly optimized Snowflake [`COPY INTO &lt;table&gt;` Command](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html). Here is the code snippet:

```python
    df.copy_into_table("{}".format(tname))
```

### Snowflake's Table Format
One of the major advantages of Snowflake is being able to eliminate the need to manage a file-based data lake. And Snowflake was designed for this purpose from the beginning. In the step we are loading the raw data into a structured Snowflake managed table. But Snowflake tables can natively support structured and semi-structured data, and are stored in Snowflake's mature cloud table format (which predates Hudi, Delta or Iceberg).

Once loaded into Snowflake the data will be securely stored and managed, without the need to worry about securing and managing raw files. Additionally the data, whether raw or structured, can be transformed and queried in Snowflake using SQL or the language of your choice, without needing to manage separate compute services like Spark.

This is a huge advantage for Snowflake customers.


### Warehouse Elasticity (Dynamic Scaling)
With Snowflake there is only one type of user defined compute cluster, the [Virtual Warehouse](https://docs.snowflake.com/en/user-guide/warehouses.html), regardless of the language you use to process that data (SQL, Python, Java, Scala, Javascript, etc.). This makes working with data much simpler in Snowflake. And governance of the data is completely separated from the compute cluster, in other words there is no way to get around Snowflake governance regardless of the warehouse settings or language being used.

And these virtual warehouses can be dynamically scaled, in under a second for most sized warehouses! This means that in your code you can dynamically resize the compute environment to increase the amount of capacity to run a section of code in a fraction of the time, and then dynamically resized again to reduce the amount of capacity. And because of our per-second billing (with a sixty second minimum) you won't pay any extra to run that section of code in a fraction of the time!

Let's see how easy that is done. Here is the code snippet:

```python
_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

# Some data processing code

_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()
```

Please also note that we included the `WAIT_FOR_COMPLETION` parameter in the first `ALTER WAREHOUSE` statement. Setting this parameter to `TRUE` will block the return of the `ALTER WAREHOUSE` command until the resize has finished provisioning all its compute resources. This way we make sure that the full cluster is available before processing any data with it.

We will use this pattern a few more times during this Quickstart, so it's important to understand.


## Load Weather


During this step we will be "loading" the raw weather data to Snowflake. But "loading" is the really the wrong word here. Because we're using Snowflake's unique data sharing capability we don't actually need to copy the data to our Snowflake account with a custom ETL process. Instead we can directly access the weather data shared by Weather Source in the Snowflake Data Marketplace. To put this in context, we are on step **#3** in our data flow overview. 


### Snowflake Data Marketplace
Weather Source is a leading provider of global weather and climate data and their OnPoint Product Suite provides businesses with the necessary weather and climate data to quickly generate meaningful and actionable insights for a wide range of use cases across industries. Let's connect to the `Weather Source LLC: frostbyte` feed from Weather Source in the Snowflake Data Marketplace by following these steps:

* Login to Snowsight
* Click on the `Marketplace` link in the left navigation bar
* Enter "Weather Source LLC: frostbyte" in the search box and click return
* Click on the "Weather Source LLC: frostbyte" listing tile
* Click the blue "Get" button
    * Expand the "Options" dialog
    * Change the "Database name" to read "FROSTBYTE_WEATHERSOURCE" (all capital letters)
    * Select the "HOL_ROLE" role to have access to the new database
* Click on the blue "Get" button

That's it... we don't have to do anything from here to keep this data updated. The provider will do that for us and data sharing means we are always seeing whatever they have published. How amazing is that? Just think of all the things you didn't have do here to get access to an always up-to-date, third-party dataset!

In [None]:
-- Make sure you grant the table privileges to HOL_ROLE for this to run
GRANT IMPORTED PRIVILEGES ON DATABASE IDENTIFIER('"FROSTBYTE_WEATHERSOURCE"') TO ROLE IDENTIFIER('"HOL_ROLE"');

-- Preview Weathersource data
SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100; 

## Create POS View

During this step we will be creating a view to simplify the raw POS schema by joining together 6 different tables and picking only the columns we need. But what's really cool is that we're going to define that view with the Snowpark DataFrame API! Then we're going to create a Snowflake stream on that view so that we can incrementally process changes to any of the POS tables. To put this in context, we are on step **#4** in our data flow overview.

### Run the Script
To create the view and stream, execute the following code.

In [None]:
# SNOWFLAKE ADVANTAGE: Snowpark DataFrame API
# SNOWFLAKE ADVANTAGE: Streams for incremental processing (CDC)
# SNOWFLAKE ADVANTAGE: Streams on views


from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def create_pos_view(session):
    session.use_schema('HARMONIZED')
    order_detail = session.table("RAW_POS.ORDER_DETAIL").select(F.col("ORDER_DETAIL_ID"), \
                                                                F.col("LINE_NUMBER"), \
                                                                F.col("MENU_ITEM_ID"), \
                                                                F.col("QUANTITY"), \
                                                                F.col("UNIT_PRICE"), \
                                                                F.col("PRICE"), \
                                                                F.col("ORDER_ID"))
    order_header = session.table("RAW_POS.ORDER_HEADER").select(F.col("ORDER_ID"), \
                                                                F.col("TRUCK_ID"), \
                                                                F.col("ORDER_TS"), \
                                                                F.to_date(F.col("ORDER_TS")).alias("ORDER_TS_DATE"), \
                                                                F.col("ORDER_AMOUNT"), \
                                                                F.col("ORDER_TAX_AMOUNT"), \
                                                                F.col("ORDER_DISCOUNT_AMOUNT"), \
                                                                F.col("LOCATION_ID"), \
                                                                F.col("ORDER_TOTAL"))
    truck = session.table("RAW_POS.TRUCK").select(F.col("TRUCK_ID"), \
                                                F.col("PRIMARY_CITY"), \
                                                F.col("REGION"), \
                                                F.col("COUNTRY"), \
                                                F.col("FRANCHISE_FLAG"), \
                                                F.col("FRANCHISE_ID"))
    menu = session.table("RAW_POS.MENU").select(F.col("MENU_ITEM_ID"), \
                                                F.col("TRUCK_BRAND_NAME"), \
                                                F.col("MENU_TYPE"), \
                                                F.col("MENU_ITEM_NAME"))
    franchise = session.table("RAW_POS.FRANCHISE").select(F.col("FRANCHISE_ID"), \
                                                        F.col("FIRST_NAME").alias("FRANCHISEE_FIRST_NAME"), \
                                                        F.col("LAST_NAME").alias("FRANCHISEE_LAST_NAME"))
    location = session.table("RAW_POS.LOCATION").select(F.col("LOCATION_ID"))

    
    '''
    We can do this one of two ways: either select before the join so it is more explicit, or just join on the full tables.
    The end result is the same, it's mostly a readibility question.
    '''
    # order_detail = session.table("RAW_POS.ORDER_DETAIL")
    # order_header = session.table("RAW_POS.ORDER_HEADER")
    # truck = session.table("RAW_POS.TRUCK")
    # menu = session.table("RAW_POS.MENU")
    # franchise = session.table("RAW_POS.FRANCHISE")
    # location = session.table("RAW_POS.LOCATION")

    t_with_f = truck.join(franchise, truck['FRANCHISE_ID'] == franchise['FRANCHISE_ID'], rsuffix='_f')
    oh_w_t_and_l = order_header.join(t_with_f, order_header['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
                                .join(location, order_header['LOCATION_ID'] == location['LOCATION_ID'], rsuffix='_l')
    final_df = order_detail.join(oh_w_t_and_l, order_detail['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
                            .join(menu, order_detail['MENU_ITEM_ID'] == menu['MENU_ITEM_ID'], rsuffix='_m')
    final_df = final_df.select(F.col("ORDER_ID"), \
                            F.col("TRUCK_ID"), \
                            F.col("ORDER_TS"), \
                            F.col('ORDER_TS_DATE'), \
                            F.col("ORDER_DETAIL_ID"), \
                            F.col("LINE_NUMBER"), \
                            F.col("TRUCK_BRAND_NAME"), \
                            F.col("MENU_TYPE"), \
                            F.col("PRIMARY_CITY"), \
                            F.col("REGION"), \
                            F.col("COUNTRY"), \
                            F.col("FRANCHISE_FLAG"), \
                            F.col("FRANCHISE_ID"), \
                            F.col("FRANCHISEE_FIRST_NAME"), \
                            F.col("FRANCHISEE_LAST_NAME"), \
                            F.col("LOCATION_ID"), \
                            F.col("MENU_ITEM_ID"), \
                            F.col("MENU_ITEM_NAME"), \
                            F.col("QUANTITY"), \
                            F.col("UNIT_PRICE"), \
                            F.col("PRICE"), \
                            F.col("ORDER_AMOUNT"), \
                            F.col("ORDER_TAX_AMOUNT"), \
                            F.col("ORDER_DISCOUNT_AMOUNT"), \
                            F.col("ORDER_TOTAL"))
    final_df.create_or_replace_view('POS_FLATTENED_V')

def create_pos_view_stream(session):
    session.use_schema('HARMONIZED')
    _ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()

def test_pos_view(session):
    session.use_schema('HARMONIZED')
    tv = session.table('POS_FLATTENED_V')
    tv.limit(5).show()

In [None]:
create_pos_view(session)

In [None]:
create_pos_view_stream(session)

### Snowpark DataFrame API
The first thing you'll notice in the `create_pos_view()` function is that we define the Snowflake view using the Snowpark DataFrame API. After defining the final DataFrame, which captures all the logic we want in the view, we can simply call the Snowpark `create_or_replace_view()` method. Here's the final line from the `create_pos_view()` function:

```python
    final_df.create_or_replace_view('POS_FLATTENED_V')
```

For more details about the Snowpark Python DataFrame API, please check out our [Working with DataFrames in Snowpark Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes.html) page.

### Streams for Incremental Processing (CDC)
Snowflake makes processing data incrementally very easy. Traditionally the data engineer had to keep track of a high watermark (usually a datetime column) in order to process only new records in a table. This involved tracking and persisting that watermark somewhere and then using it in any query against the source table. But with Snowflake streams all the heavy lifting is done for you by Snowflake. For more details please check out our [Change Tracking Using Table Streams](https://docs.snowflake.com/en/user-guide/streams.html) user guide.

All you need to do is create a [`STREAM`](https://docs.snowflake.com/en/sql-reference/sql/create-stream.html) object in Snowflake against your base table or view, then query that stream just like any table in Snowflake. The stream will return only the changed records since the last DML option your performed. To help you work with the changed records, Snowflake streams will supply the following metadata columns along with the base table or view columns:

* METADATA$ACTION
* METADATA$ISUPDATE
* METADATA$ROW_ID

For more details about these stream metadata columns please check out the [Stream Columns](https://docs.snowflake.com/en/user-guide/streams-intro.html#stream-columns) section in our documentation.

### Streams on views
What's really cool about Snowflake's incremental/CDC stream capability is the ability to create a stream on a view! In this example we are creating a stream on a view which joins together 6 of the raw POS tables. Here is the code to do that:

```python
def create_pos_view_stream(session):
    session.use_schema('HARMONIZED')
    _ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()
```

Now when we query the `POS_FLATTENED_V_STREAM` stream to find changed records, Snowflake is actually looking for changed records in any of the 6 tables included in the view. For those who have tried to build incremental/CDC processes around denormalized schemas like this, you will appreciate the incredibly powerful feature that Snowflake provides here.

For more details please check out the [Streams on Views](https://docs.snowflake.com/en/user-guide/streams-intro.html#streams-on-views) section in our documentation.


## Fahrenheit to Celsius UDF

During this step we will be creating and deploying our first Snowpark Python object to Snowflake, a user-defined function (or UDF). To begin with the UDF will be very basic, but in a future step we'll update it to include a third-party Python package. Also in this step you will be introduced to the new SnowCLI, a new developer command line tool. SnowCLI makes building and deploying Snowpark Python objects to Snowflake a consistent experience for the developer. More details below on SnowCLI. To put this in context, we are on step **#5** in our data flow overview.

In [None]:
CREATE OR REPLACE FUNCTION NB_HOL_DB.ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF("temp_f" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
HANDLER = 'main'
AS '
def main(temp_f: float) -> float:
    return (float(temp_f) - 32) * (5/9)
';

### Running the UDF in Snowflake
In order to run the UDF in Snowflake you have a few options. Any UDF in Snowflake can be invoked through SQL as follows:

In [None]:
SELECT NB_HOL_DB.ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF(35);

## Orders Update Sproc

During this step we will be creating and deploying our first Snowpark Python stored procedure (or sproc) to Snowflake. This sproc will merge changes from the `HARMONIZED.POS_FLATTENED_V_STREAM` stream into our target `HARMONIZED.ORDERS` table. To put this in context, we are on step **#6** in our data flow overview.

### Running the Sproc Locally
To test the procedure locally, you will execute the following script.

In [None]:
# SNOWFLAKE ADVANTAGE: Python Stored Procedures

import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_orders_table(session):
    _ = session.sql("CREATE TABLE HARMONIZED.ORDERS LIKE HARMONIZED.POS_FLATTENED_V").collect()
    _ = session.sql("ALTER TABLE HARMONIZED.ORDERS ADD COLUMN META_UPDATED_AT TIMESTAMP").collect()

def create_orders_stream(session):
    _ = session.sql("CREATE STREAM HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS").collect()

def merge_order_updates(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    source = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target = session.table('HARMONIZED.ORDERS')

    # TODO: Is the if clause supposed to be based on "META_UPDATED_AT"?
    cols_to_update = {c: source[c] for c in source.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    # merge into DIM_CUSTOMER
    target.merge(source, target['ORDER_DETAIL_ID'] == source['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the ORDERS table and ORDERS_STREAM stream if they don't exist
    if not table_exists(session, schema='HARMONIZED', name='ORDERS'):
        create_orders_table(session)
        create_orders_stream(session)

    # Process data incrementally
    merge_order_updates(session)
#    session.table('HARMONIZED.ORDERS').limit(5).show()

    return f"Successfully processed ORDERS"

In [None]:
main(session)

In [None]:
script = '''
import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_orders_table(session):
    _ = session.sql("CREATE TABLE HARMONIZED.ORDERS LIKE HARMONIZED.POS_FLATTENED_V").collect()
    _ = session.sql("ALTER TABLE HARMONIZED.ORDERS ADD COLUMN META_UPDATED_AT TIMESTAMP").collect()

def create_orders_stream(session):
    _ = session.sql("CREATE STREAM HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS").collect()

def merge_order_updates(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    source = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target = session.table('HARMONIZED.ORDERS')

    # TODO: Is the if clause supposed to be based on "META_UPDATED_AT"?
    cols_to_update = {c: source[c] for c in source.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    # merge into DIM_CUSTOMER
    target.merge(source, target['ORDER_DETAIL_ID'] == source['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the ORDERS table and ORDERS_STREAM stream if they don't exist
    if not table_exists(session, schema='HARMONIZED', name='ORDERS'):
        create_orders_table(session)
        create_orders_stream(session)

    # Process data incrementally
    merge_order_updates(session)
#    session.table('HARMONIZED.ORDERS').limit(5).show()

    return f"Successfully processed ORDERS"
'''

Here is the SQL query to deploy the procedure:

In [None]:
CREATE OR REPLACE PROCEDURE orders_update_sp()
 RETURNS string
 LANGUAGE PYTHON
 RUNTIME_VERSION=3.8
 PACKAGES=('snowflake-snowpark-python','toml')
 HANDLER = 'main'
 AS $$
 {{script}}
 $$;

In [None]:
CALL ORDERS_UPDATE_SP();

### More on the Snowpark API
In this step we're starting to really use the Snowpark DataFrame API for data transformations. The Snowpark API provides the same functionality as the [Spark SQL API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html). To begin with you need to create a Snowpark session object. Like PySpark, this is accomplished with the `Session.builder.configs().create()` methods. When running locally, we use the `Session.builder.getOrCreate()` method to create the session object for us. But when deployed to Snowflake, the session object is provisioned for you automatically by Snowflake. And when building a Snowpark Python sproc the contract is that the first argument to the entry point (or handler) function is a Snowpark session.

The first thing you'll notice in the script is that we have some functions which use SQL to create objects in Snowflake and to check object status. To issue a SQL statement to Snowflake with the Snowpark API you use the `session.sql()` function, like you'd expect. Here's one example:

```python
def create_orders_stream(session):
    _ = session.sql("CREATE STREAM IF NOT EXISTS HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS \
                    SHOW_INITIAL_ROWS = TRUE;").collect()
```

The second thing to point out is how we're using DataFrames to merge changes from the source view to the target table. The Snowpark DataFrame API provides a `merge()` method which will ultimately generate a `MERGE` command in Snowflake.

```python
    source = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target = session.table('HARMONIZED.ORDERS')

    # TODO: Is the if clause supposed to be based on "META_UPDATED_AT"?
    cols_to_update = {c: source[c] for c in source.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    # merge into DIM_CUSTOMER
    target.merge(source, target['ORDER_DETAIL_ID'] == source['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])
```

Again, for more details about the Snowpark Python DataFrame API, please check out our [Working with DataFrames in Snowpark Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes.html) page.


## Daily City Metrics Update Sproc
During this step we will be creating and deploying our second Snowpark Python sproc to Snowflake. This sproc will join the `HARMONIZED.ORDERS` data with the Weather Source data to create a final, aggregated table for analysis named `ANALYTICS.DAILY_CITY_METRICS`. We will process the data incrementally from the `HARMONIZED.ORDERS` table using another Snowflake Stream. And we will again use the Snowpark DataFrame `merge()` method to merge/upsert the data. To put this in context, we are on step **#7** in our data flow overview.

In [None]:
import time
from snowflake.snowpark import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_daily_city_metrics_table(session):
    SHARED_COLUMNS= [T.StructField("DATE", T.DateType()),
                                        T.StructField("CITY_NAME", T.StringType()),
                                        T.StructField("COUNTRY_DESC", T.StringType()),
                                        T.StructField("DAILY_SALES", T.StringType()),
                                        T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
                                        T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
                                        T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
                                    ]
    DAILY_CITY_METRICS_COLUMNS = [*SHARED_COLUMNS, T.StructField("META_UPDATED_AT", T.TimestampType())]
    DAILY_CITY_METRICS_SCHEMA = T.StructType(DAILY_CITY_METRICS_COLUMNS)

    dcm = session.create_dataframe([[None]*len(DAILY_CITY_METRICS_SCHEMA.names)], schema=DAILY_CITY_METRICS_SCHEMA) \
                        .na.drop() \
                        .write.mode('overwrite').save_as_table('ANALYTICS.DAILY_CITY_METRICS')
    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')


def merge_daily_city_metrics(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    print("{} records in stream".format(session.table('HARMONIZED.ORDERS_STREAM').count()))
    orders_stream_dates = session.table('HARMONIZED.ORDERS_STREAM').select(F.col("ORDER_TS_DATE").alias("DATE")).distinct()
    orders_stream_dates.limit(5).show()

    orders = session.table("HARMONIZED.ORDERS_STREAM").group_by(F.col('ORDER_TS_DATE'), F.col('PRIMARY_CITY'), F.col('COUNTRY')) \
                                        .agg(F.sum(F.col("PRICE")).as_("price_nulls")) \
                                        .with_column("DAILY_SALES", F.call_builtin("ZEROIFNULL", F.col("price_nulls"))) \
                                        .select(F.col('ORDER_TS_DATE').alias("DATE"), F.col("PRIMARY_CITY").alias("CITY_NAME"), \
                                        F.col("COUNTRY").alias("COUNTRY_DESC"), F.col("DAILY_SALES"))
#    orders.limit(5).show()

    weather_pc = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES")
    countries = session.table("RAW_POS.COUNTRY")
    weather = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    weather = weather.join(weather_pc, (weather['POSTAL_CODE'] == weather_pc['POSTAL_CODE']) & (weather['COUNTRY'] == weather_pc['COUNTRY']), rsuffix='_pc')
    weather = weather.join(countries, (weather['COUNTRY'] == countries['ISO_COUNTRY']) & (weather['CITY_NAME'] == countries['CITY']), rsuffix='_c')
    weather = weather.join(orders_stream_dates, weather['DATE_VALID_STD'] == orders_stream_dates['DATE'])

    weather_agg = weather.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('COUNTRY_C')) \
                        .agg( \
                            F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                            F.avg(F.call_udf("ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF", F.col("AVG_TEMPERATURE_AIR_2M_F"))).alias("AVG_TEMPERATURE_C"), \
                            F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                            F.avg(F.call_udf("ANALYTICS.INCH_TO_MILLIMETER_UDF", F.col("TOT_PRECIPITATION_IN"))).alias("AVG_PRECIPITATION_MM"), \
                            F.max(F.col("MAX_WIND_SPEED_100M_MPH")).alias("MAX_WIND_SPEED_100M_MPH") \
                        ) \
                        .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("COUNTRY_C").alias("COUNTRY_DESC"), \
                            F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                            F.round(F.col("AVG_TEMPERATURE_C"), 2).alias("AVG_TEMPERATURE_CELSIUS"), \
                            F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                            F.round(F.col("AVG_PRECIPITATION_MM"), 2).alias("AVG_PRECIPITATION_MILLIMETERS"), \
                            F.col("MAX_WIND_SPEED_100M_MPH")
                            )
#    weather_agg.limit(5).show()

    daily_city_metrics_stg = orders.join(weather_agg, (orders['DATE'] == weather_agg['DATE']) & (orders['CITY_NAME'] == weather_agg['CITY_NAME']) & (orders['COUNTRY_DESC'] == weather_agg['COUNTRY_DESC']), \
                        how='left', rsuffix='_w') \
                    .select("DATE", "CITY_NAME", "COUNTRY_DESC", "DAILY_SALES", \
                        "AVG_TEMPERATURE_FAHRENHEIT", "AVG_TEMPERATURE_CELSIUS", \
                        "AVG_PRECIPITATION_INCHES", "AVG_PRECIPITATION_MILLIMETERS", \
                        "MAX_WIND_SPEED_100M_MPH")
#    daily_city_metrics_stg.limit(5).show()

    cols_to_update = {c: daily_city_metrics_stg[c] for c in daily_city_metrics_stg.schema.names}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')
    dcm.merge(daily_city_metrics_stg, (dcm['DATE'] == daily_city_metrics_stg['DATE']) & (dcm['CITY_NAME'] == daily_city_metrics_stg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == daily_city_metrics_stg['COUNTRY_DESC']), \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the DAILY_CITY_METRICS table if it doesn't exist
    if not table_exists(session, schema='ANALYTICS', name='DAILY_CITY_METRICS'):
        create_daily_city_metrics_table(session)
    
    merge_daily_city_metrics(session)
#    session.table('ANALYTICS.DAILY_CITY_METRICS').limit(5).show()

    return f"Successfully processed DAILY_CITY_METRICS"

In [None]:
main(session)

### Deploying the Sproc to Snowflake
To deploy your sproc to Snowflake by running the following commands:

In [None]:
script = '''
import time
from snowflake.snowpark import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_daily_city_metrics_table(session):
    SHARED_COLUMNS= [T.StructField("DATE", T.DateType()),
                                        T.StructField("CITY_NAME", T.StringType()),
                                        T.StructField("COUNTRY_DESC", T.StringType()),
                                        T.StructField("DAILY_SALES", T.StringType()),
                                        T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
                                        T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
                                        T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
                                    ]
    DAILY_CITY_METRICS_COLUMNS = [*SHARED_COLUMNS, T.StructField("META_UPDATED_AT", T.TimestampType())]
    DAILY_CITY_METRICS_SCHEMA = T.StructType(DAILY_CITY_METRICS_COLUMNS)

    dcm = session.create_dataframe([[None]*len(DAILY_CITY_METRICS_SCHEMA.names)], schema=DAILY_CITY_METRICS_SCHEMA) \
                        .na.drop() \
                        .write.mode('overwrite').save_as_table('ANALYTICS.DAILY_CITY_METRICS')
    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')


def merge_daily_city_metrics(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    print("{} records in stream".format(session.table('HARMONIZED.ORDERS_STREAM').count()))
    orders_stream_dates = session.table('HARMONIZED.ORDERS_STREAM').select(F.col("ORDER_TS_DATE").alias("DATE")).distinct()
    orders_stream_dates.limit(5).show()

    orders = session.table("HARMONIZED.ORDERS_STREAM").group_by(F.col('ORDER_TS_DATE'), F.col('PRIMARY_CITY'), F.col('COUNTRY')) \
                                        .agg(F.sum(F.col("PRICE")).as_("price_nulls")) \
                                        .with_column("DAILY_SALES", F.call_builtin("ZEROIFNULL", F.col("price_nulls"))) \
                                        .select(F.col('ORDER_TS_DATE').alias("DATE"), F.col("PRIMARY_CITY").alias("CITY_NAME"), \
                                        F.col("COUNTRY").alias("COUNTRY_DESC"), F.col("DAILY_SALES"))
#    orders.limit(5).show()

    weather_pc = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES")
    countries = session.table("RAW_POS.COUNTRY")
    weather = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    weather = weather.join(weather_pc, (weather['POSTAL_CODE'] == weather_pc['POSTAL_CODE']) & (weather['COUNTRY'] == weather_pc['COUNTRY']), rsuffix='_pc')
    weather = weather.join(countries, (weather['COUNTRY'] == countries['ISO_COUNTRY']) & (weather['CITY_NAME'] == countries['CITY']), rsuffix='_c')
    weather = weather.join(orders_stream_dates, weather['DATE_VALID_STD'] == orders_stream_dates['DATE'])

    weather_agg = weather.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('COUNTRY_C')) \
                        .agg( \
                            F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                            F.avg(F.call_udf("ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF", F.col("AVG_TEMPERATURE_AIR_2M_F"))).alias("AVG_TEMPERATURE_C"), \
                            F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                            F.avg(F.call_udf("ANALYTICS.INCH_TO_MILLIMETER_UDF", F.col("TOT_PRECIPITATION_IN"))).alias("AVG_PRECIPITATION_MM"), \
                            F.max(F.col("MAX_WIND_SPEED_100M_MPH")).alias("MAX_WIND_SPEED_100M_MPH") \
                        ) \
                        .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("COUNTRY_C").alias("COUNTRY_DESC"), \
                            F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                            F.round(F.col("AVG_TEMPERATURE_C"), 2).alias("AVG_TEMPERATURE_CELSIUS"), \
                            F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                            F.round(F.col("AVG_PRECIPITATION_MM"), 2).alias("AVG_PRECIPITATION_MILLIMETERS"), \
                            F.col("MAX_WIND_SPEED_100M_MPH")
                            )
#    weather_agg.limit(5).show()

    daily_city_metrics_stg = orders.join(weather_agg, (orders['DATE'] == weather_agg['DATE']) & (orders['CITY_NAME'] == weather_agg['CITY_NAME']) & (orders['COUNTRY_DESC'] == weather_agg['COUNTRY_DESC']), \
                        how='left', rsuffix='_w') \
                    .select("DATE", "CITY_NAME", "COUNTRY_DESC", "DAILY_SALES", \
                        "AVG_TEMPERATURE_FAHRENHEIT", "AVG_TEMPERATURE_CELSIUS", \
                        "AVG_PRECIPITATION_INCHES", "AVG_PRECIPITATION_MILLIMETERS", \
                        "MAX_WIND_SPEED_100M_MPH")
#    daily_city_metrics_stg.limit(5).show()

    cols_to_update = {c: daily_city_metrics_stg[c] for c in daily_city_metrics_stg.schema.names}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')
    dcm.merge(daily_city_metrics_stg, (dcm['DATE'] == daily_city_metrics_stg['DATE']) & (dcm['CITY_NAME'] == daily_city_metrics_stg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == daily_city_metrics_stg['COUNTRY_DESC']), \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the DAILY_CITY_METRICS table if it doesn't exist
    if not table_exists(session, schema='ANALYTICS', name='DAILY_CITY_METRICS'):
        create_daily_city_metrics_table(session)
    
    merge_daily_city_metrics(session)
#    session.table('ANALYTICS.DAILY_CITY_METRICS').limit(5).show()

    return f"Successfully processed DAILY_CITY_METRICS"
'''

In [None]:
CREATE OR REPLACE PROCEDURE DAILY_CITY_METRICS_UPDATE_SP()
 RETURNS string
 LANGUAGE PYTHON
 RUNTIME_VERSION=3.8
 PACKAGES=('snowflake-snowpark-python','toml')
 HANDLER = 'main'
 AS $$
 {{script}}
 $$;

### Running the Sproc in Snowflake
In order to run the sproc in Snowflake you have a few options. Any sproc in Snowflake can be invoked through SQL as follows:


In [None]:
CALL DAILY_CITY_METRICS_UPDATE_SP();

### Data Modeling Best Practice
When modeling data for analysis a best practice has been to clearly define and manage the schema of the table. In step 2, when we loaded raw data from Parquet we took advantage of Snowflake's schema detection feature to create a table with the same schema as the Parquet files. In this step we are explicitly defining the schema in DataFrame syntax and using that to create the table.

```python
def create_daily_city_metrics_table(session):
    SHARED_COLUMNS= [T.StructField("DATE", T.DateType()),
                                        T.StructField("CITY_NAME", T.StringType()),
                                        T.StructField("COUNTRY_DESC", T.StringType()),
                                        T.StructField("DAILY_SALES", T.StringType()),
                                        T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
                                        T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
                                        T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
                                    ]
    DAILY_CITY_METRICS_COLUMNS = [*SHARED_COLUMNS, T.StructField("META_UPDATED_AT", T.TimestampType())]
    DAILY_CITY_METRICS_SCHEMA = T.StructType(DAILY_CITY_METRICS_COLUMNS)

    dcm = session.create_dataframe([[None]*len(DAILY_CITY_METRICS_SCHEMA.names)], schema=DAILY_CITY_METRICS_SCHEMA) \
                        .na.drop() \
                        .write.mode('overwrite').save_as_table('ANALYTICS.DAILY_CITY_METRICS')
    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')
```

### Complex Aggregation Query
The `merge_daily_city_metrics()` function contains a complex aggregation query which is used to join together and aggregate the data from our POS and Weather Source. Take a look at the series of complex series of joins and aggregations that are expressed, and how we're even leveraging the Snowpark UDF we created in step #5!

The complex aggregation query is then merged into the final analytics table using the Snowpark `merge()` method. If you haven't already, check out your Snowflake Query history and see which queries were generated by the Snowpark API. In this case you will see that the Snowpark API took all the complex logic, including the merge and created a single Snowflake query to execute!


# Orchestrate Jobs

During this step we will be orchestrating our new Snowpark pipelines with Snowflake's native orchestration feature named Tasks. We will create two tasks, one for each stored procedure, and chain them together. We will then run the tasks. To put this in context, we are on step **#8** in our data flow overview.


### Run the Script

In [None]:
-- ----------------------------------------------------------------------------
-- Step #1: Create the tasks to call our Python stored procedures
-- ----------------------------------------------------------------------------

CREATE OR REPLACE TASK ORDERS_UPDATE_TASK
WAREHOUSE = HOL_WH
WHEN
  SYSTEM$STREAM_HAS_DATA('POS_FLATTENED_V_STREAM')
AS
CALL HARMONIZED.ORDERS_UPDATE_SP();

CREATE OR REPLACE TASK DAILY_CITY_METRICS_UPDATE_TASK
WAREHOUSE = HOL_WH
AFTER ORDERS_UPDATE_TASK
WHEN
  SYSTEM$STREAM_HAS_DATA('ORDERS_STREAM')
AS
CALL ANALYTICS.DAILY_CITY_METRICS_UPDATE_SP();

-- ----------------------------------------------------------------------------
-- Step #2: Execute the tasks
-- ----------------------------------------------------------------------------

ALTER TASK DAILY_CITY_METRICS_UPDATE_TASK RESUME;

EXECUTE TASK ORDERS_UPDATE_TASK;

### Running the Tasks
In this step we did not create a schedule for our task DAG, so it will not run on its own at this point. So in this script you will notice that we manually execute the DAG, like this:

```sql
EXECUTE TASK ORDERS_UPDATE_TASK;
```

To see what happened when you ran this task just now, highlight and run (using CMD/CTRL+Enter) this commented query in the script:

```sql
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
;
```

You will notice in the task history output that it skipped our task `ORDERS_UPDATE_TASK`. This is correct, because our `HARMONIZED.POS_FLATTENED_V_STREAM` stream doesn't have any data. We'll add some new data and run them again in the next step.

### More on Tasks
Tasks are Snowflake's native scheduling/orchestration feature. With a task you can execute any one of the following types of SQL code:

* Single SQL statement
* Call to a stored procedure
* Procedural logic using Snowflake Scripting Developer Guide

For this Quickstart we'll call our Snowpark stored procedures. Here is the SQL DDL code to create the second task:

```sql
CREATE OR REPLACE TASK DAILY_CITY_METRICS_UPDATE_TASK
WAREHOUSE = HOL_WH
AFTER ORDERS_UPDATE_TASK
WHEN
  SYSTEM$STREAM_HAS_DATA('ORDERS_STREAM')
AS
CALL ANALYTICS.DAILY_CITY_METRICS_UPDATE_SP();
```

A few things to point out. First you specify which Snowflake virtual warehouse to use when running the task with the `WAREHOUSE` clause. The `AFTER` clause lets you define the relationship between tasks, and the structure of this relationship is a Directed Acyclic Graph (or DAG) like most orchestration tools provide. The `AS` clause let's you define what the task should do when it runs, in this case to call our stored procedure.

The `WHEN` clause is really cool. We've already seen how streams work in Snowflake by allowing you to incrementally process data. We've even seen how you can create a stream on a view (which joins many tables together) and create a stream on that view to process its data incrementally! Here in the `WHEN` clause we're calling a system function `SYSTEM$STREAM_HAS_DATA()` which returns true if the specified stream has new data. With the `WHEN` clause in place the virtual warehouse will only be started up when the stream has new data. So if there's no new data when the task runs then your warehouse won't be started up and you won't be charged. You will only be charged when there's new data to process. Pretty cool, huh?

As mentioned above we did not define a `SCHEDULE` for the root task, so this DAG will not run on its own. That's fine for this Quickstart, but in a real situation you would define a schedule. See [CREATE TASK](https://docs.snowflake.com/en/sql-reference/sql/create-task.html) for the details.

And for more details on Tasks see [Introduction to Tasks](https://docs.snowflake.com/en/user-guide/tasks-intro.html).

### Task Metadata
Snowflake keeps metadata for almost everything you do, and makes that metadata available for you to query (and to create any type of process around). Tasks are no different, Snowflake maintains rich metadata to help you monitor your task runs. Here are a few sample SQL queries you can use to monitor your tasks runs:

```sql
-- Get a list of tasks
SHOW TASKS;

-- Task execution history in the past day
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
;

-- Scheduled task runs
SELECT
    TIMESTAMPDIFF(SECOND, CURRENT_TIMESTAMP, SCHEDULED_TIME) NEXT_RUN,
    SCHEDULED_TIME,
    NAME,
    STATE
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE STATE = 'SCHEDULED'
ORDER BY COMPLETED_TIME DESC;
```

### Monitoring Tasks
So while you're free to create any operational or monitoring process you wish, Snowflake provides some rich task observability features in our Snowsight UI. Try it out for yourself by following these steps:

1. In the Snowsight navigation menu, click **Data** » **Databases**.
1. In the right pane, using the object explorer, navigate to a database and schema.
1. For the selected schema, select and expand **Tasks**.
1. Select a task. Task information is displayed, including **Task Details**, **Graph**, and **Run History** sub-tabs.
1. Select the **Graph** tab. The task graph appears, displaying a hierarchy of child tasks.
1. Select a task to view its details.

# Process Incrementally

During this step we will be adding new data to our POS order tables and then running our entire end-to-end pipeline to process the new data. And this entire pipeline will be processing data incrementally thanks to Snowflake's advanced stream/CDC capabilities. To put this in context, we are on step **#9** in our data flow overview.


In [None]:
-- ----------------------------------------------------------------------------
-- Step #1: Add new/remaining order data
-- ----------------------------------------------------------------------------

USE SCHEMA RAW_POS;

ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE;

LS @external.frostbyte_raw_stage/pos/order_header/year=2022

In [None]:
COPY INTO ORDER_HEADER FROM @external.frostbyte_raw_stage/pos/order_header/year=2022
FILE_FORMAT = (FORMAT_NAME = EXTERNAL.PARQUET_FORMAT)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

In [None]:
COPY INTO ORDER_DETAIL FROM @external.frostbyte_raw_stage/pos/order_detail/year=2022
FILE_FORMAT = (FORMAT_NAME = EXTERNAL.PARQUET_FORMAT)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

In [None]:
ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL;

### Viewing the Task History
Like the in the previous step, to see what happened when you ran this task DAG, highlight and run (using CMD/CTRL+Enter) this commented query in the script:

```sql
SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
    SCHEDULED_TIME_RANGE_START=>DATEADD('DAY',-1,CURRENT_TIMESTAMP()),
    RESULT_LIMIT => 100))
ORDER BY SCHEDULED_TIME DESC
;
```

This time you will notice that the `ORDERS_UPDATE_TASK` task will not be skipped, since the `HARMONIZED.POS_FLATTENED_V_STREAM` stream has new data. In a few minutes you should see that both the `ORDERS_UPDATE_TASK` task and the `DAILY_CITY_METRICS_UPDATE_TASK` task completed successfully.

### Query History for Tasks
One important thing to understand about tasks, is that the queries which get executed by the task won't show up with the default Query History UI settings. In order to see the queries that just ran you need to do the following:

* Remove filters at the top of this table, including your username, as later scheduled tasks will run as "System":

![](assets/query_history_remove_filter1.png)

* Click "Filter", and add filter option 'Queries executed by user tasks' and click "Apply Filters":

![](assets/query_history_remove_filter2.png)

You should now see all the queries run by your tasks! Take a look at each of the MERGE commands in the Query History to see how many records were processed by each task. And don't forget to notice that we processed the whole pipeline just now, and did so incrementally!


## Teardown
Once you're finished with the Quickstart and want to clean things up, you can simply run the following commands.


In [None]:
DROP DATABASE NB_HOL_DB;
DROP WAREHOUSE HOL_WH;
DROP ROLE HOL_ROLE;
DROP DATABASE FROSTBYTE_WEATHERSOURCE;

### What we've covered
We've covered a ton in this Quickstart, and here are the highlights:

* Snowflake's Table Format
* Data ingestion with COPY
* Schema inference
* Data sharing/marketplace (instead of ETL)
* Streams for incremental processing (CDC)
* Streams on views
* Python UDFs (with third-party packages)
* Python Stored Procedures
* Snowpark DataFrame API
* Snowpark Python programmability
* Warehouse elasticity (dynamic scaling)
* Visual Studio Code Snowflake native extension (PuPr, Git integration)
* SnowCLI (PuPr)
* Tasks (with Stream triggers)
* Task Observability
* GitHub Actions (CI/CD) integration

### Related Resources
And finally, here's a quick recap of related resources:

* [Full Demo on Snowflake Demo Hub](https://developers.snowflake.com/demos/data-engineering-pipelines/)
* [Source Code on GitHub](https://github.com/Snowflake-Labs/sfguide-data-engineering-with-snowpark-python)
* [Snowpark Developer Guide for Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html)
    * [Writing Python UDFs](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python.html)
    * [Writing Stored Procedures in Snowpark (Python)](https://docs.snowflake.com/en/sql-reference/stored-procedures-python.html)
    * [Working with DataFrames in Snowpark Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes.html)
* Related Tools
    * [Snowflake Visual Studio Code Extension](https://marketplace.visualstudio.com/items?itemName=snowflake.snowflake-vsc)
    * [SnowCLI Tool](https://github.com/Snowflake-Labs/snowcli)