# 07 Load Daily City Metrics

* Author: Jeremiah Hansen
* Last Updated: 6/11/2024

This notebook will load data into the `DAILY_CITY_METRICS` table with support for incremental processing.

In [None]:
-- This won't be needed when we can pass variables to Notebooks!
SELECT current_database() AS DATABASE_NAME, current_schema() AS SCHEMA_NAME

In [None]:
# Import python packages
import logging
from snowflake.core import Root

logger = logging.getLogger("demo_logger")

# Get the target database and schema using the results from the SQL cell above
# This won't be needed when we can pass variables to Notebooks!
current_context_df = cells.sql_get_context.to_pandas()
database_name = current_context_df.iloc[0,0]
schema_name = current_context_df.iloc[0,1]

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()
#session.use_schema(f"{database_name}.{schema_name}")

logger.info("07_load_daily_city_metrics start")

## Create a function to check if a table exists

This function uses the [Snowflake Python Management API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview).

In [None]:
def table_exists(session, database_name='', schema_name='', table_name=''):
    root = Root(session)
    tables = root.databases[database_name].schemas[schema_name].tables.iter(like=table_name)
    for table_obj in tables:
        if table_obj.name == table_name:
            return True

    return False

# Not used, SQL alternative to Python version above
def table_exists2(session, database_name='', schema_name='', table_name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM {}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(database_name, schema_name, table_name)).collect()[0]['TABLE_EXISTS']
    return exists

## Pipeline to update daily_city_metrics

In [None]:
import snowflake.snowpark.functions as F

table_name = "DAILY_CITY_METRICS"

# Define the tables
order_detail = session.table("ORDER_DETAIL")
history_day = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
location = session.table("LOCATION")

# Join the tables
order_detail = order_detail.join(location, order_detail['LOCATION_ID'] == location['LOCATION_ID'])
order_detail = order_detail.join(history_day, (F.builtin("DATE")(order_detail['ORDER_TS']) == history_day['DATE_VALID_STD']) & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY']) & (location['CITY'] == history_day['CITY_NAME']))

# Aggregate the data
final_agg = order_detail.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('ISO_COUNTRY_CODE')) \
                        .agg( \
                            F.sum('PRICE').alias('DAILY_SALES_SUM'), \
                            F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                            F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                        ) \
                        .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("ISO_COUNTRY_CODE").alias("COUNTRY_DESC"), \
                            F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM")).alias("DAILY_SALES"), \
                            F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                            F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                        )

# If the table doesn't exist then create it
if not table_exists(session, database_name=database_name, schema_name=schema_name, table_name=table_name):
    final_agg.write.mode("overwrite").save_as_table(table_name)

    logger.info(f"Successfully created {table_name}")
# Otherwise update it
else:
    cols_to_update = {c: final_agg[c] for c in final_agg.schema.names}

    dcm = session.table(table_name)
    dcm.merge(final_agg, (dcm['DATE'] == final_agg['DATE']) & (dcm['CITY_NAME'] == final_agg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \
                        [F.when_matched().update(cols_to_update), F.when_not_matched().insert(cols_to_update)])

    logger.info(f"Successfully updated {table_name}")


## Debugging

In [None]:
--SELECT * FROM DAILY_CITY_METRICS LIMIT 10;