# Part 2: ...

- They will need to create a stage for the permanent UDF
- Re-do modin imports

## Python UDFs (Jason)

Snowflake provides a variey of built-in functions like [`SUM()`](https://docs.snowflake.com/en/sql-reference/functions/sum), [`REGEXP_REPLACE()`](https://docs.snowflake.com/en/sql-reference/functions/regexp_replace), [`SEARCH()`](https://docs.snowflake.com/en/sql-reference/functions/search) and so on. But what if you need a function that isn't provided? That's where [User Defined Functions](https://docs.snowflake.com/en/developer-guide/udf/udf-overview) (UDF) come in! In Snowflake, you can write your UDFs in SQL, Python, Java, Scala, and JavaScript. There are  3 types of UDFs: scalar, tabular, and aggregate. Let's go over the different UDF types: 

| Type      | Input            | Output              | Description                                                                                                                                                                            |
|-----------|------------------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Scalar    | One row          | One row             | This is probably the type of UDF you think of intuitively. For every input row, this returns an output row (one in, one out).                                                          |
| Tabular   | One row          | One or more rows    | UDTFs can return multiple rows for a single input row. Common examples include breaking up arrays or objects into multiple rows by their attributes.                                   |
| Aggregate | One or more rows | One row (per group) | UDAFs can help you do, well, aggregations! UDAFs are similar to functions like SUM, AVG, and STDDEV: you can use them with GROUP BY clauses to calculate domain-specific aggregations. |

If you've used UDFs before, you may be familiar with [Vectorized UDFs](https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udfs#label-snowpark-python-udf-vectorized), which allow you to process **batches** of rows at a time in your scalar and tabular UDFs. When you tag a scalar UDF or UDTF with the `@vectorized` decorator, that tells Snowflake to provide batches of input rows to your UDF/UDTF in the form of [pandas DataFrames](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html). This can be helpful for using Python libraries that operate on pandas DataFrames, and can be more performant for some workloads. Vectorized UDFs and UDTFs are only supported for Python.

### Writing UDFs

To create a UDF with Python, you can annotate a Python method with the [`@udf`](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.functions.udf) decorator. This will use your Snowpark Session to create the UDF. (Under the hood it will run `CREATE OR REPLACE` with the source code to create the object.) Or, you can run [`Session.udf.register()`](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.udf.UDFRegistration.register#snowflake.snowpark.udf.UDFRegistration.register) and pass in a reference to the Python method. 

The following two examples are equivalent:
```python
from snowflake.snowpark.functions import udf

@udf(is_permanent=True, stage_location='stage_name', replace=True)
def combine(a: str, b:str) -> str:
    return a+b
```

```python
from snowflake.snowpark import Session

session = Session.get_active_session()

def combine(a: str, b:str) -> str:
    return a+b

session.udf.register(combine, is_permanent=True, stage_location='stage_name', replace=true)
```

Let's create a basic UDF and call it in a query. 

In [None]:
# Create a UDF
from snowflake.snowpark.functions import udf

# TODO: Have them create a stage in the beginning setup, we'll need it here and possibly in other sections

@udf(is_permanent=True, stage_location='TODO')
def my_udf(something) -> int:
    pass

In [None]:
-- Now we can call this UDF in a SQL query
SELECT my_udf(col)
FROM TASTY_BITES_TABLE
WHERE ...
LIMIT 1000

In [None]:
# And of course, we can use this UDF in a DataFrame query as well
# TODO: Query with Menu item with DF.apply
menu_item

### Writing a UDAF

Now that we're familiar with creating and calling basic UDFs, let's take this up a notch and try using a [User Defined **Aggregate** Function](https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udafs) (UDAF)! UDAFs allow us to implement custom aggregation logic or aggregations that Snowflake does not provide built-in. UDAFs are inherently more complicated than scalar UDFs -- you have have multiple input rows per group which need to be aggregated. Therefore, you will use a [Python Class](https://www.w3schools.com/python/python_classes.asp) to implement the required methods. The definition below illustrates the four methods that Snowflake needs.


```python
class MyUDAF:
    def ___init___(self):
        self._initial_value = None

    def accumulate(self, input_value):
        # This is where input rows will be passed. In this method, aggregate the input_value with the
        # self._initial_value. (For example: summing, appending to a list, etc.)
        self._initial_value += input_value
    
    @property
    def aggregate_state(self):
        # This exposes the self._initial_value with a known name so Snowflake can access it during execution
        return self._initial_value
    
    def merge(self, other_aggregate_state):
        # This merges two intermediate aggregates. On large datasets, Snowflake will call this to merge 
        # intermediate results. The input, other_aggregate_state, is the @property of another instance of this class
        self._inital_value += other_aggregate_state
    
    def finish(self):
        # Snowflake will call this method to retrieve the final result
        return self._partial_sum
```

Here is a concrete example. This UDAF calculates an average.

```python
@dataclass
class AvgAggState:
    # An average is the sum of values divided by the count, so this tracks the rolling sum and counts.
    sum: int
    count: int

class PythonAvg:
    def __init__(self):
        # This is the initial value (sum and count are both set to 0)
        self._agg_state = AvgAggState(0, 0)

    @property
    def aggregate_state(self):
        # This exposes the intermediate state under a known property name, agregate_state, so Snowflake can access it.
        return self._agg_state

    def accumulate(self, input_value):
        # input_value will be an input row, so we add the value to the sum and increment the count
        sum = self._agg_state.sum
        count = self._agg_state.count
        
        self._agg_state.sum = sum + input_value
        self._agg_state.count = count + 1

    def merge(self, other_agg_state):
        # other_agg_state will be an AvgAggState, so we combine the sums and counts
        sum = self._agg_state.sum
        count = self._agg_state.count
        
        other_sum = other_agg_state.sum
        other_count = other_agg_state.count
        
        self._agg_state.sum = sum + other_sum
        self._agg_state.count = count + other_count

    def finish(self):
        # Finally, return the sum divided by the count!
        sum = self._agg_state.sum
        count = self._agg_state.count
        return sum / count
```

In [None]:
import math
from dataclasses import dataclass
from datetime import datetime

from snowflake.snowpark.types import FloatType, DateType
from snowflake.snowpark.functions import udaf
from snowflake.snowpark import types as T
from snowflake.snowpark.functions import udaf

class LoyaltyScoreUDAF:
    def __init__(self):
        self.total_orders: int = 0
        self.total_value: float = 0
        self.last_order_ts: datetime = None

    def accumulate(self, order_total, order_ts):
        if order_total is not None:
            self.total_orders += 1
            self.total_value += order_total
            
        if order_ts is not None:
            if self.last_order_ts is None or order_ts > self.last_order_ts:
                self.last_order_ts = order_ts

    def merge(self, other: tuple):
        self.total_orders += other[0]
        self.total_value += other[1]
        if self.last_order_ts is None or (other[2] and other[2] > self.last_order_ts):
            self.last_order_ts = other[2]

    def finish(self):
        if self.total_orders == 0:
            return 0.0
        avg_order_value = self.total_value / self.total_orders
        
        now = datetime.utcnow().date()
        days_since_last = (now - self.last_order_ts).days if self.last_order_ts else 999
        recency_boost = 1 / (days_since_last + 1)

        # Weighting parameters
        w1, w2, w3 = 1.0, 0.1, 5.0

        return (
            w1 * math.log(self.total_orders + 1) +
            w2 * avg_order_value +
            w3 * recency_boost
        )

    @property
    def aggregate_state(self):
        return (self.total_orders, self.total_value, self.last_order_ts)
      
loyalty_udaf = udaf(
    LoyaltyScoreUDAF, 
    name="LoyaltyScoreUDAF",
    is_permanent=True,
    stage_location='@my_stage',
    replace=True, 
    return_type=FloatType(), 
    input_types=[FloatType(), DateType()])

print(f'Created UDAF, "{loyalty_udaf.name}" ')

In [None]:
CREATE TABLE TASTY_BYTES_ORDERS (
  ORDER_ID BIGINT,
  TRUCK_ID INT,
  ORDER_TS TIMESTAMP_TZ,
  ORDER_DETAIL_ID BIGINT,
  LINE_NUMBER INT,
  TRUCK_BRAND_NAME STRING,
  MENU_TYPE STRING,
  PRIMARY_CITY STRING,
  REGION STRING,
  COUNTRY STRING,
  FRANCHISE_FLAG BOOLEAN,
  FRANCHISE_ID INT,
  FRANCHISEE_FIRST_NAME STRING,
  FRANCHISEE_LAST_NAME STRING,
  LOCATION_ID INT,
  PLACEKEY STRING,
  LOCATION_NAME STRING,
  TOP_CATEGORY STRING,
  SUB_CATEGORY STRING,
  LATITUDE FLOAT,
  LONGITUDE FLOAT,
  CUSTOMER_ID INT,
  FIRST_NAME STRING,
  LAST_NAME STRING,
  E_MAIL STRING,
  PHONE_NUMBER STRING,
  CHILDREN_COUNT INT,
  GENDER STRING,
  MARITAL_STATUS STRING,
  MENU_ITEM_ID INT,
  MENU_ITEM_NAME STRING,
  QUANTITY INT,
  UNIT_PRICE FLOAT,
  PRICE FLOAT,
  ORDER_AMOUNT FLOAT,
  ORDER_TAX_AMOUNT FLOAT,
  ORDER_DISCOUNT_AMOUNT FLOAT,
  ORDER_TOTAL FLOAT
);

copy into jasonfreeberg.public.tasty_bytes_orders
from @frostbytes/harmonized/orders_v.csv
on_error = continue;

In [None]:
SELECT
  CUSTOMER_ID,
  LoyaltyScoreUDAF(ORDER_TOTAL, ORDER_TS) AS loyalty_score
FROM tasty_bytes_orders
WHERE 
    ORDER_TOTAL IS NOT NULL 
    AND ORDER_TS IS NOT NULL
    AND CUSTOMER_ID IS NOT NULL
GROUP BY CUSTOMER_ID
ORDER BY loyalty_score DESC
LIMIT 10;

## Accessing External Data

Next, let's leverage the flexbility of Python to enrich our Tasty Bytes orders data with weather data to understand how weather conditions impact customer behavior—do people order more tacos when it's sunny? Fewer smoothies when it's cold? 

To do so, we will use [OpenWeather](https://openweathermap.org/)'s [REST API](https://openweathermap.org/api/one-call-3#history) to fetch information about the weather for the time, longitude, and latitude when that order was made. OpenWeather allows you to make 1,000 API requests per day. Not familiar with REST APIs? Check out [this article](https://tutorialedge.net/software-eng/what-is-a-rest-api/), or feel free to call an instructor over for more information!

First, let's create an External Access Integration to allow code in our Snowflake account to reach out to OpenWeatherMap.

In [None]:
CREATE OR REPLACE NETWORK RULE openweathermap_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('api.openweathermap.org');

In [None]:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION openweathermap_access_integration
  ALLOWED_NETWORK_RULES = (openweathermap_network_rule)
  ENABLED = true;

Now that the network rule and external access integration are created, we need to associate it with this Notebook by following [these steps](https://docs.snowflake.com/en/user-guide/ui-snowsight/notebooks-external-access#enable-external-access-integrations-eai).

In [None]:
# Since the Notebook was restarted, let's re-import these modules
# re-create the session object.
import streamlit as st
import modin.pandas as pd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
# TODO: Put the API key in the Secrets store

def get_url(lat, lon, time):
    # This function constructs the URL that we will send requests to
    API_KEY = '34719d209ca19d3e008f1374cd55ac63'
    #return f'https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={lat}&lon={lon}&dt={time}&appid={API_KEY}'
    
    # TODO: this functionaly works but it ignores the time param, it just gets current weather
    return f'https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&dt={time}&exclude=hourly,daily&appid={API_KEY}'
    
# Let's try it here:
example_url = get_url(37.7749, -122.4194, 1744596465)

print(f'Example URL to send requests to...\n{example_url}')

In [None]:
import requests
from datetime import datetime

def get_weather(lat: int, long: int, time: datetime):
    # This will get the temp, cloudiness, and description for the
    # weather at the given latitude, longitude, and time.

    epoch_time = int(time.timestamp())
    
    url = get_url(lat, long, epoch_time)
    response = requests.get(url)
    
    if response.ok:
        json_ = response.json()

        # Pull out the data we want
        return {
            'temp': json_['main']['temp'],
            'cloudiness': json_['clouds']['all'],
            'description': json_['weather'][0]['description']
        }
        
    else:  # Bad response
        print(f"Error: {response.status_code}")
        return None

In [None]:
print("Let's do a test run of the function!")
get_weather(37.7749, 122.4194, datetime(2023, 1, 1, 12, 30, 0))

In [None]:
from snowflake.snowpark.types import VariantType, StringType, LongType, TimestampTimeZone, TZ, TimestampType

session.udf.register(
    get_weather,
    name='get_weather',
    replace=True,
    return_type=StringType(),
    input_types=[StringType(), StringType(), TimestampType(TimestampTimeZone.TZ)],
    packages=['requests'],
    external_access_integrations=['openweathermap_access_integration']
)

In [None]:
select 
    latitude, longitude, order_ts, 
    get_weather(latitude, longitude, order_ts) as weather
from TASTY_BYTES_ORDERS
limit 5;

In [None]:
# TODO: A Python example

## Scheduling Python Jobs

- Introducde Tasks
- Explain that stored procedures and Notebooks can be executed from Tasks. Briefly weigh pros and cons of these options
- Example DDL for a Task definition
- Show button to Schedule this Notebook

In Snowflake, Tasks are used to automate and schedule jobs. Tasks can schedule the execution of a Notebook, a stored procedure, or arbitrary SQL commands. Check out the docs for [`CREATE TASK`](https://docs.snowflake.com/en/sql-reference/sql/create-task) for more information, but at its core a Task needs a **schedule** and a **warehouse**. For example, the SQL below defines a Task that runs every hour with the Warehouse, `my_wh`, and calls a stored procedure named `my_stored_procedure`.

```sql
USE DATABASE TEST_DB;
USE SCHEMA TEST_SCHEMA;

CREATE TASK my_task
  WAREHOUSE = mywh
  SCHEDULE = '60 MINUTES'
  AS
    CALL my_stored_procedure

ALTER TASK my_stored_procedure RESUME;
```

And of course, you can define the same Task using Python!

```python
from datetime import timedelta
from snowflake.core.task import Cron, Task

tasks = root.databases["TEST_DB"].schemas["TEST_SCHEMA"].tasks

task = tasks.create(
    Task(
        name="my_task",
        definition="CALL my_stored_procedure",
        schedule=Cron("0 * * * *", "America/Los_Angeles"),
        warehouse="my_wh"
    ),
)
```


## Monitor and Troubleshoot Python


## 

## Streamlit/Reporting/visualizations (Doris)