1. External Data Source:
- The pipeline will pull Dow 30 stock market data from a public API (e.g., Yahoo
Finance, Alpha Vantage, or Quandl).
- Data will be ingested in CSV/JSON format via Snowflake External Stages (S3
or Google buckets).
2. Raw Data Storage (RAW_DOW30 Schema):
- The ingested data will be stored in a staging table (RAW_DOW30_STAGING).
- Snowflake Streams will track new records for incremental updates.
3. Data Harmonization & Transformation (HARMONIZED_DOW30 Schema):
- Transformation logic using Snowpark Python:
    - Convert raw API response into a structured table.
    - Standardize timestamps and financial indicators.
    - Ensure data consistency and remove duplicates.
- A harmonized table (DOW30_HARMONIZED) will store cleaned and structured
stock market data.
- User-Defined Functions (UDFs):
    - SQL UDF: A function to normalize currency exchange rates.
    - Python UDF: A function to calculate stock price volatility.
4. Analytics & Aggregation (ANALYTICS_DOW30 Schema):
- Precomputed analytics tables for:
    - Daily & weekly performance metrics returns.
- Stored Procedure (UPDATE_DOW30_SP) to handle incremental updates and
apply transformations.
5. Task Orchestration & Automation:
- Snowflake Tasks will automate:
    - Data ingestion (LOAD_DOW30_TASK).
    - Daily updates (UPDATE_DOW30_METRICS_TASK).
- A Snowflake Notebook will be used for data engineering and the usage of
Snowpark Python.
- GitHub Actions Integration for CI/CD of Snowpark Python code.
6. Testing & Validation:
- Implement unit tests for UDFs and stored procedures.
- Use sample datasets for validating pipeline correctness.
- Monitor task execution logs to ensure proper scheduling and data updates.
7. Environment Management with Jinja Templates:
- Create Jinja-based scripts to support DEV and PROD environments.
- Use parameterized configurations to dynamically adjust Snowflake roles,
schemas, and warehouses per environment.

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()
env = 'PROD' if session.get_current_schema() == '"PROD_SCHEMA"' else 'DEV'
print(env)

In [None]:
--!jinja
USE DATABASE USD_SPOT_EXCHANGE;
USE SCHEMA {{env}}_SCHEMA;
USE WAREHOUSE USDSE_WH;

In [None]:
CREATE OR REPLACE SCHEMA {{env}}_RAW_DATA;
CREATE OR REPLACE TABLE {{env}}_raw_data.raw_exchange_rates (
    date DATE,
    usd_to_inr FLOAT,
    eur_to_usd FLOAT,
    gbp_to_usd FLOAT
);

In [None]:
CREATE OR REPLACE SCHEMA {{env}}_HARMONIZED;

CREATE OR REPLACE TABLE {{env}}_harmonized.usd_inr (
    date DATE,
    exchange_value FLOAT
);

CREATE OR REPLACE TABLE {{env}}_harmonized.eur_usd (
    date DATE,
    exchange_value FLOAT
);

CREATE OR REPLACE TABLE {{env}}_harmonized.gbp_usd (
    date DATE,
    exchange_value FLOAT
);

In [None]:
CREATE OR REPLACE SCHEMA {{env}}_ANALYTICS;

CREATE OR REPLACE TABLE {{env}}_analytics.usd_inr_analysis (
    date DATE unique,
    exchange_value FLOAT,
    ytd_change FLOAT,
    mtd_change FLOAT,
    percentage_change FLOAT,
    carc FLOAT
);

CREATE OR REPLACE TABLE {{env}}_analytics.eur_usd_analysis (
    date DATE unique,
    exchange_value FLOAT,
    ytd_change FLOAT,
    mtd_change FLOAT,
    percentage_change FLOAT,
    carc FLOAT
);

CREATE OR REPLACE TABLE {{env}}_analytics.gbp_usd_analysis (
    date DATE unique,
    exchange_value FLOAT,
    ytd_change FLOAT,
    mtd_change FLOAT,
    percentage_change FLOAT,
    carc FLOAT
);

In [None]:
--MTD-

CREATE OR REPLACE FUNCTION {{env}}_ANALYTICS.MTD_CHANGE("FIRST_VALUE" FLOAT, "CURRENT_VALUE" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_mtd_change'
AS '
def calculate_mtd_change(first_value, current_value):
    if first_value is None or current_value is None or first_value == 0:
        return None
    return ((current_value - first_value) / first_value) * 100
';

In [None]:
--YTD-

CREATE OR REPLACE FUNCTION {{env}}_ANALYTICS.YTD_CHANGE("FIRST_VALUE" FLOAT, "CURRENT_VALUE" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_ytd_change'
AS '
def calculate_ytd_change(first_value, current_value):
    if first_value is None or current_value is None or first_value == 0:
        return None
    return ((current_value - first_value) / first_value) * 100
';

In [None]:
--Percentage-

CREATE OR REPLACE FUNCTION {{env}}_ANALYTICS.PERCENTAGE_CHANGE("OLD_VALUE" FLOAT, "NEW_VALUE" FLOAT)
RETURNS FLOAT
LANGUAGE SQL
AS '
    CASE 
        WHEN old_value IS NULL OR new_value IS NULL OR old_value = 0 THEN NULL 
        ELSE ((new_value - old_value) / old_value) * 100 
    END
';

In [None]:
--Compounded annual rate changes

CREATE OR REPLACE FUNCTION {{env}}_ANALYTICS.COMPOUNDED_ANNUAL_RATE_OF_CHANGE("INITIAL_VALUE" FLOAT, "FINAL_VALUE" FLOAT, "YEARS" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_carc'
AS '
def calculate_carc(initial_value, final_value, years):
    if initial_value is None or final_value is None or years is None or initial_value <= 0 or years <= 0:
        return None  # Prevent division by zero or invalid inputs

    return ((final_value / initial_value) ** (1 / years) - 1) * 100
';

In [None]:
CREATE OR REPLACE FUNCTION {{env}}_ANALYTICS.compounded_annual_rate_of_change(
    initial_value FLOAT, 
    final_value FLOAT, 
    years FLOAT
) 
RETURNS FLOAT
LANGUAGE PYTHON 
RUNTIME_VERSION = '3.8'
HANDLER = 'calculate_carc'
AS
$$
def calculate_carc(initial_value, final_value, years):
    if initial_value is None or final_value is None or years is None or initial_value <= 0 or years <= 0:
        return None  # Prevent division by zero or invalid inputs

    return ((final_value / initial_value) ** (1 / years) - 1) * 100
$$;



In [None]:
from datetime import date
from dateutil.relativedelta import relativedelta
import snowflake.snowpark.functions as F


def load_raw_table(session):
    try:
        env = 'PROD' if session.get_current_schema() == "PROD_SCHEMA" else 'DEV'
        end_date = date(2025, 2, 21)
        limit_date = end_date - relativedelta(months=14)
        target = f'{env}_RAW_DATA.RAW_EXCHANGE_RATES'
        df = session.read.option("header", True).csv('@INTEGRATIONS.USDSE_RAW_STAGE/daily_data.csv')
        df = df.filter(F.col('"observation_date"') <= limit_date)\
        .rename({'"observation_date"': 'date',
                 'DEXUSUK': 'gbp_to_usd',
                 'DEXUSEU': 'eur_to_usd',
                 'DEXINUS': 'usd_to_inr'})
        df.write.mode("overwrite").save_as_table(target)
        return f'Data up to {limit_date} loaded into {target}'
    except Exception as e:
        return f'error loading data {str(e)}'
load_raw_table(session)

In [None]:
USE SCHEMA {{env}}_RAW_DATA;
CREATE OR REPLACE STREAM raw_exchange_rates_stream 
ON TABLE {{env}}_raw_data.raw_exchange_rates;

In [None]:

-- LOAD RAW DATA INTO DOWNSTREAM HARMONIZED DATA

INSERT INTO {{env}}_harmonized.usd_inr (date, exchange_value)
SELECT DISTINCT date, usd_to_inr FROM {{env}}_raw_data.raw_exchange_rates
WHERE usd_to_inr IS NOT NULL;

INSERT INTO {{env}}_harmonized.eur_usd (date, exchange_value)
SELECT DISTINCT date, eur_to_usd FROM {{env}}_raw_data.raw_exchange_rates
WHERE eur_to_usd IS NOT NULL;

INSERT INTO {{env}}_harmonized.gbp_usd (date, exchange_value)
SELECT DISTINCT date, gbp_to_usd FROM {{env}}_raw_data.raw_exchange_rates
WHERE gbp_to_usd IS NOT NULL;


In [None]:
CREATE OR REPLACE PROCEDURE {{env}}_HARMONIZED.process_new_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    BEGIN TRANSACTION;
    
    INSERT INTO {{env}}_harmonized.usd_inr (date, exchange_value)
    SELECT date, usd_to_inr 
    FROM {{env}}_raw_data.raw_exchange_rates_stream 
    WHERE metadata$action = 'INSERT' 
    AND metadata$isupdate = False 
    AND usd_to_inr IS NOT NULL;
    
    INSERT INTO {{env}}_harmonized.eur_usd (date, exchange_value)
    SELECT date, eur_to_usd FROM {{env}}_raw_data.raw_exchange_rates_stream
    WHERE metadata$action = 'INSERT' AND 
    metadata$isupdate = False 
    AND eur_to_usd IS NOT NULL;

    INSERT INTO {{env}}_harmonized.gbp_usd (date, exchange_value)
    SELECT date, gbp_to_usd FROM {{env}}_raw_data.raw_exchange_rates_stream
    WHERE metadata$action = 'INSERT' AND 
    metadata$isupdate = False 
    AND gbp_to_usd IS NOT NULL;
    
    COMMIT;
    RETURN 'HARMONIZED DATA UPDATED FROM STREAM';
END;
$$;

In [None]:
CREATE OR REPLACE PROCEDURE {{env}}_raw_data.update_raw_table()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
PACKAGES = ('snowflake-snowpark-python')
HANDLER = 'update_raw_table'
AS
$$

import snowflake.snowpark.functions as F
from snowflake.snowpark.context import get_active_session
from datetime import date
from dateutil.relativedelta import relativedelta

def update_raw_table():
    session = get_active_session()
    env = 'PROD' if session.get_current_schema() == 'PROD_SCHEMA' else 'DEV'
    target = f'{env}_RAW_DATA.RAW_EXCHANGE_RATES'
    latest_inserted_date = session.table(target)\
    .select(F.max(F.col('date'))).collect()[0][0]
    limit_date = latest_inserted_date + relativedelta(months=1)
    df = session.read.option('header', True).csv('@INTEGRATIONS.USDSE_RAW_STAGE/daily_data.csv')
    df = df.filter(F.col('"observation_date"') > latest_inserted_date)\
    .filter(F.col('"observation_date"')<=limit_date)\
    .rename({'"observation_date"': 'date',
                 'DEXUSUK': 'gbp_to_usd',
                 'DEXUSEU': 'eur_to_usd',
                 'DEXINUS': 'usd_to_inr'})
    df.write.mode('append').save_as_table(target)
    return f'Data up to {limit_date} updated into {target}'
$$;

In [None]:
CREATE OR REPLACE PROCEDURE {{env}}_ANALYTICS.load_analytics_data()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
    -- Insert data into analytics.usd_inr_analysis
 -- Insert data into {{env}}_analytics.usd_inr_analysis
    INSERT INTO {{env}}_analytics.usd_inr_analysis (date, exchange_value, ytd_change, mtd_change, percentage_change, carc)
    SELECT 
        date, 
        exchange_value, 
        {{env}}_analytics.ytd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date) ORDER BY date), exchange_value),
        {{env}}_analytics.mtd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date), MONTH(date) ORDER BY date), exchange_value),
        COALESCE({{env}}_analytics.percentage_change(LAG(exchange_value) OVER (ORDER BY date), exchange_value), 0),
        COALESCE({{env}}_analytics.compounded_annual_rate_of_change(100, exchange_value, YEAR(date) - 2020), 0)
    FROM {{env}}_harmonized.usd_inr 
    WHERE date > COALESCE((SELECT max(date) FROM {{env}}_analytics.usd_inr_analysis), '1900-01-01');

    -- Insert data into analytics.eur_usd_analysis
    INSERT INTO {{env}}_analytics.eur_usd_analysis (date, exchange_value, ytd_change, mtd_change, percentage_change, carc)
    SELECT 
        date, 
        exchange_value, 
        {{env}}_analytics.ytd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date) ORDER BY date), exchange_value),
        {{env}}_analytics.mtd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date), MONTH(date) ORDER BY date), exchange_value),
        COALESCE({{env}}_analytics.percentage_change(LAG(exchange_value) OVER (ORDER BY date), exchange_value), 0),
        COALESCE({{env}}_analytics.compounded_annual_rate_of_change(100, exchange_value, YEAR(date) - 2020), 0)
    FROM {{env}}_harmonized.eur_usd where date> COALESCE((select max(date) from {{env}}_analytics.eur_usd_analysis), '1900-01-01');

    -- Insert data into analytics.gbp_usd_analysis
    INSERT INTO {{env}}_analytics.gbp_usd_analysis (date, exchange_value, ytd_change, mtd_change, percentage_change, carc)
    SELECT 
        date, 
        exchange_value, 
        {{env}}_analytics.ytd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date) ORDER BY date), exchange_value),
        {{env}}_analytics.mtd_change(FIRST_VALUE(exchange_value) OVER (PARTITION BY YEAR(date), MONTH(date) ORDER BY date), exchange_value),
        COALESCE({{env}}_analytics.percentage_change(LAG(exchange_value) OVER (ORDER BY date), exchange_value), 0),
        COALESCE({{env}}_analytics.compounded_annual_rate_of_change(100, exchange_value, YEAR(date) - 2020), 0)
    FROM {{env}}_harmonized.gbp_usd where date> COALESCE((select max(date) from {{env}}_analytics.gbp_usd_analysis), '1900-01-01');
    
    RETURN 'Analytics data successfully loaded with CARC';
END;
$$;

-- Execute the stored procedure
-- CALL load_analytics_data();

In [None]:
USE SCHEMA {{env}}_HARMONIZED;

CREATE OR REPLACE TASK UPDATE_RAW_TABLE_TASK
WAREHOUSE = USDSE_WH
SCHEDULE = 'USING CRON 0 3 * * * UTC'
AS
CALL {{env}}_RAW_DATA.update_raw_table();

CREATE OR REPLACE TASK UPDATE_HARMONIZED_FROM_STREAM_TASK
WAREHOUSE = USDSE_WH
AFTER UPDATE_RAW_TABLE_TASK
AS
CALL {{env}}_HARMONIZED.process_new_data();

CREATE OR REPLACE TASK load_harmonized_to_analytics
WAREHOUSE = USDSE_WH
AFTER UPDATE_HARMONIZED_FROM_STREAM_TASK
AS
CALL {{env}}_ANALYTICS.load_analytics_data();

In [None]:
SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('{{env}}_HARMONIZED.UPDATE_RAW_TABLE_TASK');

In [None]:
create or replace view USD_SPOT_EXCHANGE.{{env}}_ANALYTICS.EXCHANGE_RATE_SUMMARY(
	DATE,
	USD_TO_INR,
	EUR_TO_USD,
	GBP_TO_USD,
	INR_CHANGE,
	EUR_CHANGE,
	GBP_CHANGE
) as
SELECT 
    r.date,
    r.usd_to_inr,
    r.eur_to_usd,
    r.gbp_to_usd,
    percentage_change(LAG(r.usd_to_inr) OVER (ORDER BY r.date), r.usd_to_inr) AS inr_change,
    percentage_change(LAG(r.eur_to_usd) OVER (ORDER BY r.date), r.eur_to_usd) AS eur_change,
    percentage_change(LAG(r.gbp_to_usd) OVER (ORDER BY r.date), r.gbp_to_usd) AS gbp_change
FROM {{env}}_raw_data.raw_exchange_rates r;