In [None]:
SET GITHUB_SECRET_USERNAME = 'sahilmutha1999';
SET GITHUB_SECRET_PASSWORD = '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';
SET GITHUB_URL_PREFIX = 'https://github.com/Damg7245-BigDataIntelligence';
SET GITHUB_REPO_ORIGIN = 'https://github.com/Damg7245-BigDataIntelligence/FRED_Currency_Exchange.git';

In [None]:
USE ROLE ACCOUNTADMIN; 

SET MY_USER = CURRENT_USER();
CREATE OR REPLACE ROLE FRED_ROLE;
GRANT ROLE FRED_ROLE TO ROLE SYSADMIN;
GRANT ROLE FRED_ROLE TO USER IDENTIFIER($MY_USER);

GRANT EXECUTE TASK ON ACCOUNT TO ROLE FRED_ROLE;
GRANT MONITOR EXECUTION ON ACCOUNT TO ROLE FRED_ROLE;
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE FRED_ROLE;

-- Databases
CREATE OR REPLACE DATABASE FRED_DB;
GRANT OWNERSHIP ON DATABASE FRED_DB TO ROLE FRED_ROLE;

-- Warehouses
CREATE OR REPLACE WAREHOUSE FRED_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
GRANT OWNERSHIP ON WAREHOUSE FRED_WH TO ROLE FRED_ROLE;


In [None]:
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE FRED_WH;
USE DATABASE FRED_DB;

CREATE OR REPLACE SCHEMA INTEGRATIONS;;
CREATE OR REPLACE SCHEMA DEV_RAW_SCHEMA;
CREATE OR REPLACE SCHEMA DEV_HARMONIZED_SCHEMA;
CREATE OR REPLACE SCHEMA DEV_ANALYTICS_SCHEMA;
CREATE OR REPLACE SCHEMA PROD_RAW_SCHEMA;
CREATE OR REPLACE SCHEMA PROD_HARMONIZED_SCHEMA;
CREATE OR REPLACE SCHEMA PROD_ANALYTICS_SCHEMA;


USE SCHEMA INTEGRATIONS;

CREATE OR REPLACE SECRET DEMO_GITHUB_SECRET
  TYPE = password
  USERNAME = $GITHUB_SECRET_USERNAME
  PASSWORD = $GITHUB_SECRET_PASSWORD;

CREATE OR REPLACE API INTEGRATION DEMO_GITHUB_API_INTEGRATION
  API_PROVIDER = GIT_HTTPS_API
  API_ALLOWED_PREFIXES = ($GITHUB_URL_PREFIX)
  ALLOWED_AUTHENTICATION_SECRETS = (DEMO_GITHUB_SECRET)
  ENABLED = TRUE;

-- Git Repository
CREATE OR REPLACE GIT REPOSITORY FRED_GIT_REPO
  API_INTEGRATION = DEMO_GITHUB_API_INTEGRATION
  GIT_CREDENTIALS = DEMO_GITHUB_SECRET
  ORIGIN = $GITHUB_REPO_ORIGIN;

In [None]:
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE FRED_WH;
USE DATABASE FRED_DB;
CREATE OR REPLACE SCHEMA EXTERNAL;

CREATE OR REPLACE STORAGE INTEGRATION fred_s3_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::699475925561:role/snowflake_s3_role'
  STORAGE_ALLOWED_LOCATIONS = ('s3://fredcurrencyexhange/');
GRANT USAGE ON INTEGRATION fred_s3_integration TO ROLE FRED_ROLE;
GRANT USAGE ON SCHEMA EXTERNAL TO ROLE FRED_ROLE;
GRANT ALL PRIVILEGES ON SCHEMA FRED_DB.EXTERNAL TO ROLE FRED_ROLE;

-- Create the file format
USE ROLE FRED_ROLE;
USE WAREHOUSE FRED_WH;
USE DATABASE FRED_DB;
USE SCHEMA EXTERNAL;

CREATE OR REPLACE FILE FORMAT CSV_FORMAT 
TYPE = 'CSV' 
FIELD_OPTIONALLY_ENCLOSED_BY = '"' 
PARSE_HEADER = TRUE;


In [None]:
SET env = 'DEV';

USE ROLE ACCOUNTADMIN;
USE WAREHOUSE FRED_WH;
USE SCHEMA FRED_DB.INTEGRATIONS;

EXECUTE IMMEDIATE FROM @FRED_GIT_REPO/branches/main/scripts/noteboook_deploy.sql
    USING (env => $env, branch => 'main', schema1 => 'RAW_SCHEMA', schema2 => 'HARMONIZED_SCHEMA', schema3 => 'ANALYTICS_SCHEMA');

DAG Creation

In [None]:
# Import Python packages from Snowflake's internal libraries
from snowflake.core import Root
from snowflake.snowpark.context import get_active_session

# Get the active session and set role, warehouse, and schema
session = get_active_session()
session.use_warehouse("FRED_WH")



In [None]:
# Set your database and schema
database_name = "FRED_DB"
schema_name = "INTEGRATIONS"             # Using the INTEGRATIONS schema
# Determine the environment based on schema name, defaulting to DEV unless it's explicitly PROD
env = 'PROD' if schema_name.upper() == 'PROD_SCHEMA' else 'DEV'

# Set the schema to use
session.use_schema(f"{database_name}.{schema_name}")

In [None]:
SHOW NOTEBOOKS IN DATABASE FRED_DB;


In [None]:
# Import DAG classes from Snowflake's internal DAG library
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
from datetime import timedelta

# Define your warehouse and DAG name
warehouse_name = "FRED_WH"
dag_name = "FRED_DAG"

# Instantiate the API root and get the target schema object
api_root = Root(session)
schema = api_root.databases[database_name].schemas[schema_name]
dag_op = DAGOperation(schema)
target_schema1 = f"{env}_RAW_SCHEMA"
target_schema2 = f"{env}_HARMONIZED_SCHEMA"
target_schema3 = f"{env}_ANALYTICS_SCHEMA"

# Define the DAG using the DAG API
with DAG(dag_name, schedule=timedelta(days=1), warehouse=warehouse_name) as dag:
    # Define tasks: each task runs a Snowflake notebook stored in your target schema.
    # The notebook names are constructed based on the environment (DEV or PROD).
    dag_task1 = DAGTask(
        "LOAD_RAW_DATA_TASK", 
        definition=f'''EXECUTE NOTEBOOK "{database_name}"."{target_schema1}"."{env}_load_raw_data"()''', 
        warehouse=warehouse_name
    )
    dag_task2 = DAGTask(
        "HARMONIZE_DATA_TASK", 
        definition=f'''EXECUTE NOTEBOOK "{database_name}"."{target_schema2}"."{env}_harmonize_data"()''', 
        warehouse=warehouse_name
    )
    dag_task3 = DAGTask(
        "ANALYTICS_TASK", 
        definition=f'''EXECUTE NOTEBOOK "{database_name}"."{target_schema3}"."{env}_analytics"()''', 
        warehouse=warehouse_name
    )
    
    # Define the dependencies between the tasks:
    # LOAD_RAW_DATA_TASK must complete before HARMONIZE_DATA_TASK,
    # which in turn must complete before ANALYTICS_TASK.
    dag_task1 >> dag_task2 >> dag_task3

# Deploy the DAG in Snowflake
dag_op.deploy(dag, mode="orreplace")

# Optionally, iterate through deployed DAGs to verify the deployment
dag_iter = dag_op.iter_dags(like='FRED_DAG%')
for d in dag_iter:
    print(d)

# Optionally, run the DAG immediately
# dag_op.run(dag)
