# Setting Up Snowflake

In [None]:
SET MY_USER = CURRENT_USER();

-- Check on this 
SET GITHUB_SECRET_USERNAME = '##############';
SET GITHUB_SECRET_PASSWORD = '#####################';
SET GITHUB_URL_PREFIX = 'https://github.com/#####################';
SET GITHUB_REPO_ORIGIN = 'https://github.com/##############################';

In [None]:

USE ROLE ACCOUNTADMIN;

-- Roles
CREATE OR REPLACE ROLE FRED_ROLE;
GRANT ROLE FRED_ROLE TO ROLE SYSADMIN;
GRANT ROLE FRED_ROLE TO USER IDENTIFIER($MY_USER);


GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE FRED_ROLE;
GRANT EXECUTE TASK ON ACCOUNT TO ROLE FRED_ROLE;
GRANT EXECUTE MANAGED TASK ON ACCOUNT TO ROLE FRED_ROLE;
GRANT MONITOR EXECUTION ON ACCOUNT TO ROLE FRED_ROLE;
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE FRED_ROLE;

-- Databases
CREATE OR REPLACE DATABASE FRED_DB;
GRANT OWNERSHIP ON DATABASE FRED_DB TO ROLE FRED_ROLE;

-- Warehouses
CREATE OR REPLACE WAREHOUSE FRED_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
GRANT OWNERSHIP ON WAREHOUSE FRED_WH TO ROLE FRED_ROLE;

USE ROLE FRED_ROLE;
USE WAREHOUSE FRED_WH;
USE DATABASE FRED_DB;

In [None]:
-- ----------------------------------------------------------------------------
-- Create the database level objects
-- ----------------------------------------------------------------------------
-- Schemas
CREATE OR REPLACE SCHEMA INTEGRATIONS;;
CREATE OR REPLACE SCHEMA DEV_RAW_FRED;
CREATE OR REPLACE SCHEMA DEV_HARMONIZED;
CREATE OR REPLACE SCHEMA DEV_ANALYTICS;
CREATE OR REPLACE SCHEMA PROD_RAW_FRED;
CREATE OR REPLACE SCHEMA PROD_HARMONIZED;
CREATE OR REPLACE SCHEMA PROD_ANALYTICS;

USE SCHEMA INTEGRATIONS;

CREATE OR REPLACE STAGE S3_FRED_STAGE
    URL = 's3://#############/'
    CREDENTIALS = (AWS_KEY_ID = '#########################' 
                   AWS_SECRET_KEY = '#########################');


-- Secrets (schema level)
CREATE OR REPLACE SECRET FRED_GITHUB_SECRET
  TYPE = password
  USERNAME = $GITHUB_SECRET_USERNAME
  PASSWORD = $GITHUB_SECRET_PASSWORD;

-- API Integration (account level)
-- This depends on the schema level secret!
CREATE OR REPLACE API INTEGRATION FRED_GITHUB_API_INTEGRATION
  API_PROVIDER = GIT_HTTPS_API
  API_ALLOWED_PREFIXES = ($GITHUB_URL_PREFIX)
  ALLOWED_AUTHENTICATION_SECRETS = (FRED_GITHUB_SECRET)
  ENABLED = TRUE;

-- Git Repository
CREATE OR REPLACE GIT REPOSITORY FRED_GIT_REPO
  API_INTEGRATION = FRED_GITHUB_API_INTEGRATION
  GIT_CREDENTIALS = FRED_GITHUB_SECRET
  ORIGIN = $GITHUB_REPO_ORIGIN;

In [None]:
USE ROLE ACCOUNTADMIN;

CREATE EVENT TABLE FRED_DB.INTEGRATIONS.FRED_EVENTS;
GRANT SELECT ON EVENT TABLE FRED_DB.INTEGRATIONS.FRED_EVENTS TO ROLE FRED_ROLE;
GRANT INSERT ON EVENT TABLE FRED_DB.INTEGRATIONS.FRED_EVENTS TO ROLE FRED_ROLE;

ALTER ACCOUNT SET EVENT_TABLE = FRED_DB.INTEGRATIONS.FRED_EVENTS;
ALTER DATABASE FRED_DB SET LOG_LEVEL = INFO;

In [None]:
USE ROLE FRED_ROLE;
USE WAREHOUSE FRED_WH;
USE SCHEMA FRED_DB.INTEGRATIONS;

EXECUTE IMMEDIATE FROM @FRED_GIT_REPO/branches/main/scripts/deploy_notebooks.sql
    USING (env => 'DEV', schema1 => 'RAW_FRED', schema2 => 'HARMONIZED', schema3 => 'ANALYTICS', branch => 'main');

In [None]:
EXECUTE NOTEBOOK FRED_DB.PROD_ANALYTICS."PROD_03_analytics_table_processing"()

# DAG Creation Script

In [None]:
# Import necessary packages
from snowflake.core import Root
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from datetime import timedelta

In [None]:
database_name="FRED_DB"
env="PROD"

session = get_active_session()
session.use_role("FRED_ROLE")
session.use_warehouse("FRED_WH")

database_name = "FRED_DB"
schema1 = f"{env}_RAW_FRED"
schema2 = f"{env}_HARMONIZED"
schema3 = f"{env}_ANALYTICS"
schema_name= "INTEGRATIONS"

In [None]:
## Task 3: Merge FRED updates
sql_query = f"""
CREATE OR REPLACE TASK FRED_DB.INTEGRATIONS.SPOC_TASK_MERGE_FRED_UPDATES
WAREHOUSE = FRED_WH
WHEN SYSTEM$STREAM_HAS_DATA('FRED_DB.{env}_HARMONIZED.FRED_STREAM')
AS
BEGIN
    CALL FRED_DB.{env}_ANALYTICS.merge_fred_updates_sp('FRED_DB', '{env}_ANALYTICS', '{env}');
    CALL FRED_DB.{env}_ANALYTICS.create_analytical_tables_sp('{env}_ANALYTICS', 'FRED_10Y_2Y');
END;
"""
session.sql(sql_query)

In [None]:
session.use_schema(f"{database_name}.{schema_name}")

# Create a Root object and obtain the schema
api_root = Root(session)
schema = api_root.databases[database_name].schemas[schema_name]

dag_op = DAGOperation(schema)

warehouse_name = "FRED_WH"
dag_name = f"{env}_FRED_ANALYTICS_DAG"

with DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name) as dag:
    dag_task1 = DAGTask(
        "AWS_S3_DATA_STAGING", 
        definition=f'EXECUTE NOTEBOOK "{database_name}"."{schema1}"."{env}_01_load_files"()', 
        warehouse=warehouse_name
    )

    dag_task2 = DAGTask(
        "DATA_TRANSFORMATION_TO_HARMONIZED", 
        definition=f'EXECUTE NOTEBOOK "{database_name}"."{schema2}"."{env}_02_raw_to_harmonized"()', 
        warehouse=warehouse_name
    )
    
    dag_task3 = DAGTask(
        "SPOC_TASK_MERGE_FRED_UPDATES",
        definition=f'EXECUTE TASK "{database_name}"."{schema_name}"."SPOC_TASK_MERGE_FRED_UPDATES";', 
        warehouse=warehouse_name
    )

    # Define task dependencies
    dag_task1 >> dag_task2 >> dag_task3

# Deploy the DAG
dag_op.deploy(dag, mode="orreplace")


In [None]:
-- SELECT * FROM FRED_DB.DEV_ANALYTICS.FRED_10Y_2Y ORDER BY observation_date DESC LIMIT 10;
SELECT * FROM FRED_DB.DEV_ANALYTICS.FRED_COMBINED_DAILY ORDER BY obs_date DESC LIMIT 10;

## Teardown Scripts 

In [None]:
-- USE ROLE ACCOUNTADMIN;
-- DROP ROLE FRED_ROLE;
-- DROP API INTEGRATION FRED_GITHUB_API_INTEGRATION;
-- DROP DATABASE FRED_DB;
-- DROP WAREHOUSE FRED_WH;