In [None]:
CREATE OR REPLACE PROCEDURE DATASCIENCE.PROC_DELTA_LOAD()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'main'
AS
$$
from snowflake.snowpark import Session
from datetime import datetime

def main(session):

    # ---------- CONFIG ----------
    folder_path = datetime.utcnow().strftime("%Y/%b/%d/").title()
    print(f"📌 Using folder path: {folder_path}")

    # ---------- CONSTANTS ----------
    NOW_NTZ = "TO_TIMESTAMP_NTZ(TO_CHAR(CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()), 'YYYY-MM-DD HH24:MI:SS'))"
    OPEN_END_DATE = "TO_TIMESTAMP_NTZ('2099-12-31 23:59:59')"

    # ---------- ENTITIES ----------
    entities = {
        "customers": {
            "bronze": "bronze_customers_delta",
            "silver": "silver_customers_delta",
            "stage_entity": "customers",
            "pattern": r".*Customers_.*?/part-.*\.parquet",
            "key": "customer_id",
            "compare_cols": ["customer_name", "start_date"]
        },
        "products": {
            "bronze": "bronze_products_delta",
            "silver": "silver_products_delta",
            "stage_entity": "products",
            "pattern": r".*Products_.*?/part-.*\.parquet",
            "key": "product_id",
            "compare_cols": ["product_name", "start_date"]
        },
        "orders": {
            "bronze": "bronze_orders_delta",
            "silver": "silver_orders_delta",
            "stage_entity": "orders",
            "pattern": r".*Orders_.*?/part-.*\.parquet",
            "key": "order_id",
            "compare_cols": ["customer_id", "product_id", "start_date"]
        }
    }

    # ---------- Helper Function ----------
    def stage_has_files(stage_entity, pattern):
        stage_path = f"@bronze_csv/{stage_entity}/Delta_load/csv_files/{folder_path}"
        try:
            rows = session.sql(f"LIST {stage_path};").collect()
            return len(rows) > 0
        except Exception as e:
            print(f"⚠️ LIST failed for {stage_path}: {e}")
            return False

    # ---------- STEP 1: Refresh Bronze Tables ----------
    for name, meta in entities.items():
        bronze_table = meta["bronze"]
        stage_entity = meta["stage_entity"]
        stage_path = f"@bronze_csv/{stage_entity}/Delta_load/csv_files/{folder_path}"
        pat = meta["pattern"]

        if stage_has_files(stage_entity, pat):
            session.sql(f"TRUNCATE TABLE {bronze_table};").collect()
            copy_sql = f"""
            COPY INTO {bronze_table}
            FROM {stage_path}
            FILE_FORMAT = (FORMAT_NAME = my_parquet_format)
            MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
            PATTERN = '{pat}';
            """
            session.sql(copy_sql).collect()
            print(f"✅ Loaded {bronze_table}")
        else:
            print(f"ℹ️ No new files for {name}. Keeping existing {bronze_table}.")

    # ---------- STEP 2: Delta Transformations ----------
    for name, meta in entities.items():
        bronze = meta["bronze"]
        bronze_clean = bronze.replace("_delta", "_clean")
        silver = meta["silver"]
        key = meta["key"]
        compare_cols = meta["compare_cols"]

        print(f"\n🧹 Cleaning bronze table for {name} -> {bronze_clean}")

        if name == "customers":
            clean_sql = f"""
            CREATE OR REPLACE TABLE {bronze_clean} AS
            SELECT DISTINCT
                TRY_CAST(customer_id AS INT) AS customer_id,
                TRIM(customer_name) AS customer_name,
                CAST(start_date AS TIMESTAMP_NTZ) AS start_date,
                LOWER(COALESCE(delete_flag,'N')) AS delete_flag
            FROM {bronze}
            WHERE customer_id IS NOT NULL
              AND TRY_CAST(customer_id AS INT) > 0
              AND start_date IS NOT NULL
              AND start_date <= CURRENT_TIMESTAMP();
            """
        elif name == "products":
            clean_sql = f"""
            CREATE OR REPLACE TABLE {bronze_clean} AS
            SELECT DISTINCT
                CAST(product_id AS STRING) AS product_id,
                TRIM(product_name) AS product_name,
                CAST(start_date AS TIMESTAMP_NTZ) AS start_date,
                LOWER(COALESCE(delete_flag,'N')) AS delete_flag
            FROM {bronze}
            WHERE product_id IS NOT NULL
              AND start_date IS NOT NULL
              AND start_date <= CURRENT_TIMESTAMP();
            """
        elif name == "orders":
            clean_sql = f"""
            CREATE OR REPLACE TABLE {bronze_clean} AS
            SELECT DISTINCT
                CAST(order_id AS STRING) AS order_id,
                TRY_CAST(customer_id AS INT) AS customer_id,
                CAST(product_id AS STRING) AS product_id,
                CAST(start_date AS TIMESTAMP_NTZ) AS start_date,
                LOWER(COALESCE(delete_flag,'N')) AS delete_flag
            FROM {bronze}
            WHERE order_id IS NOT NULL
              AND customer_id IS NOT NULL
              AND TRY_CAST(customer_id AS INT) > 0
              AND product_id IS NOT NULL
              AND start_date IS NOT NULL
              AND start_date <= CURRENT_TIMESTAMP();
            """
        session.sql(clean_sql).collect()
        print(f"✅ Created cleaned table: {bronze_clean}")

        # Expire changed records
        change_condition = " OR ".join([f"(tgt.{c} IS DISTINCT FROM src_new.{c})" for c in compare_cols])
        expire_changed_sql = f"""
        UPDATE {silver} tgt
        SET end_date = TO_TIMESTAMP_NTZ(TO_CHAR(src_new.start_date, 'YYYY-MM-DD HH24:MI:SS'))
        FROM (
          SELECT {key}, {', '.join(compare_cols)}
          FROM {bronze_clean}
          WHERE delete_flag = 'n'
        ) src_new
        WHERE tgt.{key} = src_new.{key}
          AND tgt.end_date = {OPEN_END_DATE}
          AND ({change_condition});
        """
        session.sql(expire_changed_sql).collect()

        # Insert new/updated rows
        insert_new_sql = f"""
        INSERT INTO {silver} ({key}, {', '.join(compare_cols)}, load_date, end_date)
        SELECT
          src.{key},
          {', '.join(['src.' + c for c in compare_cols])},
          {NOW_NTZ} AS load_date,
          {OPEN_END_DATE} AS end_date
        FROM {bronze_clean} src
        WHERE src.delete_flag = 'n'
          AND NOT EXISTS (
            SELECT 1 FROM {silver} tgt
            WHERE tgt.{key} = src.{key}
              AND tgt.end_date = {OPEN_END_DATE}
          );
        """
        session.sql(insert_new_sql).collect()

        # Expire rows marked deleted
        expire_flag_y_without_n_sql = f"""
        UPDATE {silver} tgt
        SET end_date = {NOW_NTZ}
        WHERE tgt.end_date = {OPEN_END_DATE}
          AND tgt.{key} IN (
             SELECT {key} FROM {bronze_clean}
             WHERE delete_flag = 'y'
               AND {key} NOT IN (SELECT {key} FROM {bronze_clean} WHERE delete_flag = 'n')
          );
        """
        session.sql(expire_flag_y_without_n_sql).collect()

        # Expire missing rows (logical deletes)
        expire_missing_sql = f"""
        UPDATE {silver}
        SET end_date = {NOW_NTZ}
        WHERE end_date = {OPEN_END_DATE}
          AND {key} NOT IN (SELECT {key} FROM {bronze_clean});
        """
        session.sql(expire_missing_sql).collect()

    print("\n🎉 Delta load complete: silver_*_delta tables updated in Snowflake.")
    return "✅ PROC_DELTA_LOAD executed successfully"

$$;

In [None]:
CALL DATASCIENCE.PROC_DELTA_LOAD();

In [None]:
--SELECT * FROM silver_customers_delta ORDER BY customer_id;
SELECT * FROM silver_products_delta ORDER BY product_id;
--SELECT * FROM silver_orders_delta ORDER BY order_id;

In [None]:
CREATE OR REPLACE EXTERNAL TABLE bronze_stage_monitor
WITH LOCATION = @bronze_csv
AUTO_REFRESH = TRUE
FILE_FORMAT = (TYPE = PARQUET);

CREATE OR REPLACE STREAM bronze_stage_stream
ON EXTERNAL TABLE bronze_stage_monitor
INSERT_ONLY = TRUE;

In [None]:
CREATE OR REPLACE TASK delta_load_task
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 * * * * UTC'  -- optional fallback
WHEN SYSTEM$STREAM_HAS_DATA('bronze_stage_stream')
AS
CALL DATASCIENCE.PROC_DELTA_LOAD();

ALTER TASK delta_load_task RESUME;

In [None]:
DROP PROCEDURE IF EXISTS DATASCIENCE.PROC_DELTA_LOAD();

In [None]:
SELECT * FROM silver_customers_delta ORDER BY customer_id;
--SELECT * FROM silver_products_delta ORDER BY product_id;
--SELECT * FROM silver_orders_delta ORDER BY order_id;

In [None]:
SHOW TASKS;
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE NAME = 'DELTA_LOAD_TASK'
ORDER BY SCHEDULED_TIME DESC;


In [None]:
DESC STAGE DATASCIENCE.BRONZE_CSV;

In [None]:
SHOW INTEGRATIONS LIKE 'S3_BRONZE_NOTIFY';


In [None]:
CREATE OR REPLACE NOTIFICATION INTEGRATION S3_BRONZE_NOTIFY
  TYPE = QUEUE
  ENABLED = TRUE
  NOTIFICATION_PROVIDER = AWS_SNS;


In [None]:
DESC STAGE BRONZE_CSV_DELTA;
