# Customfunctions

> Custom Functions POC Project

This file will become your README and also the index of your documentation.

In [1]:
! cd ../ && pip install .

Processing /Users/jdemlow/github/customfunctions
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hBuilding wheels for collected packages: customfunctions
  Building wheel for customfunctions (pyproject.toml) ... [?25ldone
[?25h  Created wheel for customfunctions: filename=customfunctions-0.0.1-py3-none-any.whl size=13669 sha256=719ccb0e4c96cf0fb8835158a09dcd2ad091d5bc29c4e3f86891172f7a7356b6
  Stored in directory: /private/var/folders/hm/zsqyytm950g1dc_00qtbp2zh0000gn/T/pip-ephem-wheel-cache-ybp0e5v6/wheels/f5/9b/14/f9f412895d657c5fe5ab82c86c9b78cd8b2a4d167d7cdb8def
Successfully built customfunctions
Installing collected packages: customfunctions
Successfully installed customfunctions-0.0.1


In [32]:
from customfunctions.connection import SnowflakeConnection
from snowflake.snowpark.version import VERSION
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum, avg, count, to_char, current_timestamp
from datetime import datetime, timedelta
from typing import Optional

import pandas as pd
import numpy as np
import json
import random
import os

import warnings
# Suppress the specific Pydantic warning about schema
warnings.filterwarnings("ignore", message="Field name \"schema\" .* shadows an attribute in parent \"BaseModel\"")

In [5]:
try: 

    # We can also use Snowpark for our analyses!
    from snowflake.snowpark.context import get_active_session
    session = get_active_session()

except Exception as e:
    print(f"No active session with get_active_session() moving to create_snowflake_session. Error Seen:\n{e}")
    config = {
        'user': os.getenv('SNOWFLAKE_USER', ''),
        'password': os.getenv('SNOWFLAKE_PASSWORD', ''),
        'account': os.getenv('SNOWFLAKE_ACCOUNT', ''),
        'database': 'DATASCIENCE',
        'warehouse': 'DS_WH_XS',
        'schema': 'CUSTOM_FUNCTIONS',
        'role': 'DATA_SCIENTIST'
    }
    sfc = SnowflakeConnection(**config)
    session = sfc.get_session()
    snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
    snowpark_version = VERSION
    print('\nConnection Established with the following parameters:')
    print('User                        : {}'.format(snowflake_environment[0][0]))
    print('Role                        : {}'.format(session.get_current_role()))
    print('Database                    : {}'.format(session.get_current_database()))
    print('Schema                      : {}'.format(session.get_current_schema()))
    print('Warehouse                   : {}'.format(session.get_current_warehouse()))
    print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
    print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))


No active session with get_active_session() moving to create_snowflake_session. Error Seen:
(1403): No default Session is found. Please create a session before you call function 'udf' or use decorator '@udf'.

Connection Established with the following parameters:
User                        : JD_SERVICE_ACCOUNT_ADMIN
Role                        : "DATA_SCIENTIST"
Database                    : "DATASCIENCE"
Schema                      : "CUSTOM_FUNCTIONS"
Warehouse                   : "DS_WH_XS"
Snowflake version           : 8.46.1
Snowpark for Python version : 1.26.0


In [None]:
def create_fake_data(
    session,
    num_rows: int = 10000,
    table_name: str = "ORDERS",
    seed: Optional[int] = None
) -> None:
    """
    Create and load fake order data into Snowflake using lists.
    """
    # Set random seed if provided
    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)
    
    try:
        # Create data as lists
        data = [
            [
                i,  # ORDER_ID
                (datetime(2023, 1, 1) + timedelta(days=random.randint(0, 364))).strftime('%Y-%m-%d'),  # DATE as string
                random.choice(["Electronics", "Clothing", "Books", "Home", "Sports"]),  # PRODUCT_CATEGORY
                random.choice(["North", "South", "East", "West"]),  # REGION
                round(random.uniform(10, 1000), 2),  # SALES_AMOUNT
                round(random.uniform(10, 1000), 2),  # ORDER_VALUE
                random.choice(["completed", "shipped", "pending", "cancelled"])  # STATUS
            ]
            for i in range(1, num_rows + 1)
        ]

        # Define column names
        columns = ["ORDER_ID", "DATE", "PRODUCT_CATEGORY", "REGION", 
                  "SALES_AMOUNT", "ORDER_VALUE", "STATUS"]
        
        # Create Snowpark DataFrame directly from lists
        snowpark_df = session.create_dataframe(data, columns)
        
        # Write to table
        snowpark_df.write.mode("overwrite").save_as_table(table_name)
        print(f"Successfully created table '{table_name}' with {num_rows} rows")
        
        # Show sample of the data
        print("\nSample of the created data:")
        return session.sql(f"SELECT * FROM {table_name} LIMIT 10").show()
        
    except Exception as e:
        print(f"Error creating fake data: {str(e)}")
        raise

In [26]:
# Create full dataset
create_fake_data(
    session=session,
    num_rows=10000,
    table_name="ORDERS"
)

Successfully created table 'ORDERS' with 10000 rows

Sample of the created data:
--------------------------------------------------------------------------------------------------------
|"ORDER_ID"  |"DATE"      |"PRODUCT_CATEGORY"  |"REGION"  |"SALES_AMOUNT"  |"ORDER_VALUE"  |"STATUS"   |
--------------------------------------------------------------------------------------------------------
|1           |2023-08-29  |Books               |South     |741.9           |371.36         |pending    |
|2           |2023-05-25  |Sports              |West      |723.71          |550.68         |cancelled  |
|3           |2023-10-19  |Electronics         |North     |132.31          |160.01         |shipped    |
|4           |2023-09-29  |Home                |South     |944.23          |408.82         |pending    |
|5           |2023-07-10  |Sports              |South     |959.51          |872.47         |shipped    |
|6           |2023-07-12  |Clothing            |South     |469.21          |971

In [33]:
def perform_aggregation(session: Session, aggregation_request: str) -> str:
    """
    Performs data aggregation based on the provided aggregation request.

    This function takes a Snowflake session and an aggregation request in JSON format,
    applies the specified filters, groups the data, calculates the requested metrics,
    and saves the results to a target table in Snowflake.

    Parameters
    session : snowflake.snowpark.Session
        An active Snowflake session object.
    aggregation_request : str
        A JSON string containing the aggregation specifications. It should include:
        - source_table: Name of the source table in Snowflake.
        - target_table: Name of the table where results will be saved.
        - group_by: List of columns to group by.
        - metrics: List of metrics to calculate, each with a name, function, and column.
        - filters: List of filters to apply to the data.

    The aggregation_request should have the following structure:
        - TODO: Think about fine grain control, but for now you can have 
                <database>.<schema>.<table> and if the caller has access
                then it will work correctly.
    {
        "source_table": str,
        "target_table": str,
        "group_by": List[str],
        "metrics": List[Dict[str, str]],
        "filters": List[Dict[str, Union[str, List[str]]]]
    }

    Returns:
    --------
    None
        The function doesn't return a value, but it prints a confirmation message
        and saves the results to the specified target table in Snowflake.
    """

    request = json.loads(aggregation_request)
    
    df = session.table(request['source_table'])
    for filter in request['filters']:
        if filter['operator'] == 'between':
            df = df.filter((col(filter['column']) >= filter['value'][0]) & 
                           (col(filter['column']) <= filter['value'][1]))
        elif filter['operator'] == 'in':
            df = df.filter(col(filter['column']).isin(filter['value']))

    df = df.group_by(request['group_by'])

    aggs = []
    for metric in request['metrics']:
        if metric['function'] == 'sum':
            aggs.append(sum(col(metric['column'])).alias(metric['name']))
        elif metric['function'] == 'avg':
            aggs.append(avg(col(metric['column'])).alias(metric['name']))
        elif metric['function'] == 'count':
            aggs.append(count(col(metric['column'])).alias(metric['name']))
    df = df.agg(*aggs).with_column('created', to_char(current_timestamp(), 'YYYY-MM-DD HH24:MI:SS'))
    
    df.write.mode("overwrite").save_as_table(request['target_table'])
    
    return f"Aggregation completed. Results saved to {request['target_table']}"

In [34]:
aggregation_request = json.dumps({
    "request_id": "AGG_001",
    "source_table": "orders",
    "target_table": "orders_aggregates",
    "group_by": ["PRODUCT_CATEGORY", "REGION"],
    "metrics": [
        {"name": "TOTAL_SALES", "function": "sum", "column": "SALES_AMOUNT"},
        {"name": "AVERAGE_ORDER_VALUE", "function": "avg", "column": "ORDER_VALUE"},
        {"name": "ORDER_COUNT", "function": "count", "column": "ORDER_ID"}
    ],
    "filters": [
        {"column": "DATE", "operator": "between", "value": ["2023-01-01", "2023-12-31"]},
        {"column": "STATUS", "operator": "in", "value": ["completed", "shipped"]}
    ],
    "version": "1.0.0"
})

perform_aggregation(session, aggregation_request)

'Aggregation completed. Results saved to orders_aggregates'

In [35]:
session.sql(f"SELECT * FROM orders_aggregates").show()

---------------------------------------------------------------------------------------------------------------
|"PRODUCT_CATEGORY"  |"REGION"  |"TOTAL_SALES"  |"AVERAGE_ORDER_VALUE"  |"ORDER_COUNT"  |"CREATED"            |
---------------------------------------------------------------------------------------------------------------
|Electronics         |North     |121611.75      |502.2308764940239      |251            |2024-12-17 11:13:11  |
|Sports              |South     |127182.67      |511.0550199203187      |251            |2024-12-17 11:13:11  |
|Clothing            |South     |136309.18      |548.9966798418973      |253            |2024-12-17 11:13:11  |
|Home                |South     |151382.34      |480.46474637681155     |276            |2024-12-17 11:13:11  |
|Electronics         |East      |137977.52      |494.1362548262548      |259            |2024-12-17 11:13:11  |
|Books               |West      |135392.85      |489.70191570881224     |261            |2024-12-17 11:1