## Importing Libraries

In [1]:
import os
from dotenv import load_dotenv
import snowflake.connector
import pandas as pd

## Setting Up Snowflake Connection

### Loading Environmental Variables

In [2]:
# Load environment variables from the .env file
load_dotenv()

# Fetch Snowflake credentials from environment variables
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_role = os.getenv('SNOWFLAKE_ROLE')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
snowflake_database = os.getenv('SNOWFLAKE_DATABASE')
snowflake_schema = os.getenv('SNOWFLAKE_SCHEMA')
snowflake_product_status_cdc = os.getenv('SNOWFLAKE_PRODUCT_STATUS_CDC')
snowflake_product_status_hst = os.getenv('SNOWFLAKE_PRODUCT_STATUS_HST')

## Initiating SCD Type 2 Development

### Creating Warehouse, Database, and Schema in Snowflake

In [3]:
# Use Warehouse
use_snowflake_warehouse = f"ALTER WAREHOUSE {snowflake_warehouse.upper()} RESUME;"

# Use Database
use_snowflake_database = f"USE DATABASE {snowflake_database.upper()};"

# Use Schema
use_snowflake_schema = f"USE SCHEMA {snowflake_database.upper()}.{snowflake_schema.upper()};"

# Use Role
use_snowflake_role = f"USE ROLE {snowflake_role.upper()};"

### Creating Tables and Inserting Data

In [4]:
# # Create PRODUCT_STATUS_CDC table
# create_table_product_status_cdc = f"""
# CREATE OR REPLACE TABLE {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()} (
#     PRODUCT_KEY NUMBER(38,0),
#     STATUS NUMBER(38,0),
#     CHANGE_TYPE VARIANT,
#     CHANGE_TIME TIMESTAMP_NTZ(9),
#     CDC_LOG_POSITION NUMBER(38,0)
# );
# """

# # Create PRODUCT_STATUS_HST table
# create_table_product_status_hst = f"""
# CREATE OR REPLACE TABLE {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} (
#     PRODUCT_STATUS_SCD_KEY NUMBER(38,0),
#     PRODUCT_STATUS_PRODUCT_KEY NUMBER(38,0),
#     PRODUCT_STATUS_STATUS NUMBER(38,0),
#     SCD_START_TIME TIMESTAMP_NTZ(9),
#     SCD_END_TIME TIMESTAMP_NTZ(9)
# );
# """

# insert_data_product_status_cdc = f"""
# INSERT INTO {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()} (PRODUCT_KEY, STATUS, CHANGE_TYPE, CHANGE_TIME, CDC_LOG_POSITION) VALUES
#     (1, 10, 'insert', '2019-01-01 10:00:00', 1),
#     (1, 10, 'insert', '2019-01-01 10:00:00', 1),
#     (1, 10, 'update', '2019-01-01 10:30:00', 2),
#     (1, 20, 'update', '2019-01-01 11:00:00', 3),
#     (1, 20, 'delete', '2019-01-01 12:00:00', 4),
#     (1, 10, 'insert', '2019-01-01 14:00:00', 5);
# """

# Insert data into PRODUCT_STATUS_HST table
# insert_data_product_status_hst = f"""
# INSERT INTO {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} (PRODUCT_STATUS_SCD_KEY, PRODUCT_STATUS_PRODUCT_KEY, PRODUCT_STATUS_STATUS, SCD_START_TIME, SCD_END_TIME) VALUES
#     (1, 1, 10, '2019-01-01 10:00:00', '2019-01-01 11:00:00'),
#     (2, 1, 20, '2019-01-01 11:00:00', '2019-01-01 12:00:00'),
#     (3, 1, 10, '2019-01-01 14:00:00', '2999-01-01 00:00:00');
# """

### Implementing Stored Procedure for SCD Type 2 over CDC Stream Set
* Goal: To create a stored procedure that will handle both scenarios
    * Processing all data
    * processing data starting from a specific `cdc_log_position`
* Ensuring Idempotency (supports reprocessing) -> `MERGE INTO`

In [8]:
stored_procedure_name = f"PROCESS_CDC_SCD"

In [None]:
# # Just the internal query for debugging purposes
# -- Step 0: Check the CDC table
# SELECT * FROM CAIOCVELASCO.DATA_ENGINEER.PRODUCT_STATUS_CDC;

# -- Step 1: Deduplicate records from the CDC stream
# -- Here, we deduplicate the CDC table, selecting only unique records from the CDC table
# -- Solved issue: "at least once semantics" ("We may receive the same exact cdc record twice or more times")
# WITH 
#     deduplicated_cdc AS (
#         SELECT DISTINCT
#             product_key,
#             status,
#             json_extract_path_text(change_type, 'type') AS change_type,
#             change_time,
#             cdc_log_position
#         FROM CAIOCVELASCO.DATA_ENGINEER.PRODUCT_STATUS_CDC
#         WHERE change_time IS NOT NULL
    
# ), -- select * from deduplicated_cdc;

# -- Step 2: Detect the changes that need to be captured for a given product_key
# -- Here, we want to track changes in status for a given product
# -- For a given product:
# -- If inserted or updated, it will be included in the SCD.
# -- If it was deleted, it also must be tracked in the SCD, but in this case hash needs to include the 'cdc_log_position' to differentiate (CASE method below) or we won't capture this case in the value_changed column   
# hash_table_1 AS (
#     SELECT *,
#         -- HASH(product_key, status) AS _hash_test,
#         CASE 
#             WHEN change_type != 'DELETE' THEN HASH(product_key, status) -- Same hash for the same status if not a DELETE
#             ELSE HASH(product_key, status, change_type)                 -- Different hash if DELETE
#         END AS _hash_1,
#         LAG(_hash_1) OVER (PARTITION BY product_key ORDER BY cdc_log_position) AS _previous_hash_1, -- Compare hash within the same product_key
#         -- COALESCE(_hash_test != _previous_hash_1, TRUE) AS _valued_changed_test,
#         COALESCE(_hash_1 != _previous_hash_1, TRUE) AS _valued_changed_1 -- Flag if there is an effective change
#     FROM deduplicated_cdc
# ),-- select * from hash_table_1;

# -- Step 3: Intermediate table before the SCD logic 
# -- Here, for a given product, we filter for all historical rows where changes might have happened (the ones that need to be present in the SCD table)
# -- We made sure to also include the DELETE cases
# -- We also brought the change_time as an initial step towards scd_start_time and scd_end_time
# hash_table_2 AS (
#     SELECT * exclude (_hash_1, _previous_hash_1, _valued_changed_1)
#     FROM hash_table_1 
#     WHERE _valued_changed_1 = 'TRUE'
# ), -- select * from hash_table_2;

# -- Step 4: Create is_active column based on the previous status and current change_type
# -- Here, we want to flag DELETEs as not active because all we need is to close the previous status, so don't need to keep the row where change_type = DELETE
# -- For now, we will not filter the IS_ACTIVe = TRUE values because we need the DELETE rows to close the previous record
# final_table AS (
#     SELECT *,
#         -- Use LAG to get the previous status in this step
#         LAG(status) OVER (PARTITION BY product_key ORDER BY change_time) AS _previous_status, -- Get previous status
#         -- Set is_active to FALSE if the previous status is the same and change_type is DELETE
#         CASE
#             WHEN change_type = 'DELETE' THEN FALSE
#             ELSE TRUE
#         END AS is_active
#     FROM hash_table_2
# ), -- select * from final_table;

# -- Step 5: Opening and Closing rows (scd_start_time and scd_end_time) 
# -- Here, we close previous records with the respective scd_start_time and scd_end_time based on change_time
# scd_table AS (
#     SELECT 
#         product_key                        AS product_status_product_key, -- Foreign key to product_dm table
#         status                             AS product_status_status,      -- Status of the product for this SCD record
#         CAST(change_time AS TIMESTAMP_NTZ) AS scd_start_time,             -- Start time of the SCD record in TIMESTAMP_NTZ
#         -- Set end time based on next change_time or default to '2999-01-01 00:00:00' if no further changes
#         CASE 
#             WHEN is_active = FALSE THEN CAST(change_time AS TIMESTAMP_NTZ)
#             ELSE CAST(
#                 COALESCE(
#                     LEAD(change_time) OVER (PARTITION BY product_key ORDER BY change_time),
#                     TIMESTAMP '2999-01-01 00:00:00'
#                 ) AS TIMESTAMP_NTZ
#             )
#         END AS scd_end_time,
#         is_active,
#         cdc_log_position
#     FROM final_table
# ) -- select * from scd_table;

# -- Select only the active rows for the final SCD result (getting rid of the DELETE rows)
# -- Add sequential surrogate key after filtering
# SELECT 
#     ROW_NUMBER() OVER (ORDER BY product_status_product_key, scd_start_time) AS product_status_scd_key, -- SCD Type 2 surrogate key for filtered results
#     product_status_product_key,
#     product_status_status,
#     scd_start_time,
#     scd_end_time
# FROM scd_table
# WHERE is_active = TRUE
# AND (cdc_log_position IS NULL OR cdc_log_position > '0');

In [6]:
# stored_procedure_scd_type2 = f"""
# CREATE OR REPLACE PROCEDURE {stored_procedure_name.upper()}(cdc_log_position INT DEFAULT NULL)
# RETURNS STRING
# LANGUAGE SQL
# AS
# $$
# DECLARE
#     sql_query STRING;
# BEGIN
#     -- Step 1: Construct the SQL query with CTEs
#     sql_query := '
#     MERGE INTO {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} AS target
#     USING (
#         WITH 
#         deduplicated_cdc AS (
#             SELECT DISTINCT
#                 product_key,
#                 status,
#                 json_extract_path_text(change_type, ''type'') AS change_type,
#                 change_time,
#                 cdc_log_position
#             FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()}
#             WHERE change_time IS NOT NULL
#         ),
        
#         hash_table_1 AS (
#             SELECT *,
#                 CASE 
#                     WHEN change_type != ''DELETE'' THEN HASH(product_key, status)
#                     ELSE HASH(product_key, status, change_time, change_type)
#                 END AS _hash_1,
#                 LAG(_hash_1) OVER (PARTITION BY product_key ORDER BY change_time) AS _previous_hash_1,
#                 COALESCE(_hash_1 != _previous_hash_1, TRUE) AS _valued_changed_1
#             FROM deduplicated_cdc
#         ),
        
#         hash_table_2 AS (
#             SELECT * 
#             FROM hash_table_1 
#             WHERE _valued_changed_1 = ''TRUE''
#         ),
        
#         final_table AS (
#             SELECT ht2.*,
#                 LAG(status) OVER (PARTITION BY product_key ORDER BY change_time) AS _previous_status,
#                 CASE 
#                     WHEN ht2.change_type = ''DELETE'' AND ht2.status = _previous_status THEN FALSE
#                     ELSE TRUE
#                 END AS is_active
#             FROM hash_table_2 AS ht2
#         ),

#         scd_table AS (
#             SELECT 
#                 product_key AS product_status_product_key,
#                 status AS product_status_status,
#                 CAST(change_time AS TIMESTAMP_NTZ) AS scd_start_time,
#                 CASE 
#                     WHEN is_active = FALSE THEN CAST(change_time AS TIMESTAMP_NTZ)
#                     ELSE CAST(
#                         COALESCE(
#                             LEAD(change_time) OVER (PARTITION BY product_key ORDER BY change_time),
#                             TIMESTAMP ''2999-01-01 00:00:00''
#                         ) AS TIMESTAMP_NTZ
#                     )
#                 END AS scd_end_time,
#                 is_active,
#                 cdc_log_position -- Include CDC_LOG_POSITION here
#             FROM final_table
#         )
#         SELECT 
#             -- ROW_NUMBER() OVER (ORDER BY product_status_product_key, scd_start_time) AS product_status_scd_key,
#             ROW_NUMBER() OVER (PARTITION BY product_status_product_key ORDER BY scd_start_time) AS product_status_scd_key,
#             product_status_product_key,
#             product_status_status,
#             scd_start_time,
#             scd_end_time,
#             cdc_log_position
#         FROM scd_table
#         WHERE is_active = TRUE
#         -- AND (cdc_log_position IS NULL OR cdc_log_position > ' || COALESCE(cdc_log_position::STRING, 'NULL') || ')
#     ) AS source
#     ON target.product_status_product_key = source.product_status_product_key
#        -- AND target.product_status_scd_key = source.product_status_scd_key
#        AND target.product_status_status = source.product_status_status -- Check full unique conditions
#        AND target.scd_start_time = source.scd_start_time
#        AND target.scd_end_time = source.scd_end_time
       

#     WHEN MATCHED AND (source.cdc_log_position IS NULL OR source.cdc_log_position > ' || COALESCE(cdc_log_position::STRING, 'NULL') || ') THEN
#         UPDATE SET
#             -- target.product_status_scd_key = source.product_status_scd_key,
#             target.product_status_status = source.product_status_status,
#             target.scd_start_time = source.scd_start_time,
#             target.scd_end_time = source.scd_end_time

#     WHEN NOT MATCHED THEN
#         INSERT (product_status_scd_key, product_status_product_key, product_status_status, scd_start_time, scd_end_time)
#         VALUES (source.product_status_scd_key, source.product_status_product_key, source.product_status_status, source.scd_start_time, source.scd_end_time);
#         -- INSERT (product_status_product_key, product_status_status, scd_start_time, scd_end_time)
#         -- VALUES (source.product_status_product_key, source.product_status_status, source.scd_start_time, source.scd_end_time);
#     ';

#     -- Execute the constructed SQL query
#     EXECUTE IMMEDIATE sql_query;

#     RETURN 'CDC processing complete.';
# END;
# $$;
# """

In [7]:
# Check if Stored Procedure Exists
check_stored_procedure = f"SELECT * FROM INFORMATION_SCHEMA.PROCEDURES;"

In [None]:
# delete_stored_procedure_scd_type2 = f"DROP PROCEDURE IF EXISTS {snowflake_database.upper()}.{snowflake_schema.upper()}.PROCESS_CDC_SCD(INT);"

In [24]:
try:
    # Snowflake connection context manager
    with snowflake.connector.connect(
        user=snowflake_user,
        password=snowflake_password,
        account=snowflake_account,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema
    ) as conn:
        
        # Execute operations within the context manager
        cursor = conn.cursor()

        try:            
            cursor.execute(use_snowflake_database)              # Use Database (Set it for the user session)
            print("Using Database successfully.")

            cursor.execute(use_snowflake_schema)                # Use Schema (Set it for the user session)
            print("Using Schema successfully.")
    
            cursor.execute(use_snowflake_role)                  # Use Role    
            print("Setting Role successful.")

            # cursor.execute(delete_stored_procedure_scd_type2) # Delete Stored Procedure
            # print("Stored Deleted Successfully.")
            
            # cursor.execute(stored_procedure_scd_type2)          # Execute Stored Procedure
            # print("Stored Procedured Executed Successfully.")

            print("Checking if Stored Procedured Exists.")      # Check Stored Procedure
            cursor.execute(check_stored_procedure)              
            result = cursor.fetchall()                          # Fetch the result
            if result:                                          # Check if the result is empty
                print(f"Stored procedure '{stored_procedure_name}' exists in '{snowflake_database.upper()}.{snowflake_schema.upper()}'.")
            else:
                print(f"Stored procedure '{stored_procedure_name}' does not exist in schema '{snowflake_database.upper()}.{snowflake_schema.upper()}'.")
               
            cursor.execute(f"CALL {snowflake_database.upper()}.{snowflake_schema.upper()}.{stored_procedure_name}(0)")
            print("Stored Procedure executed for cdc_log_position=0.")

            # Query and load the table into a DataFrame
            cursor.execute("SELECT * FROM PRODUCT_STATUS_HST")
            data_cdc_0 = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df_cdc_0 = pd.DataFrame(data_cdc_0, columns=columns)
            print("DataFrame for cdc_log_position=0:")
            print(df_cdc_0)

            cursor.execute(f"CALL {snowflake_database.upper()}.{snowflake_schema.upper()}.{stored_procedure_name}(2)")
            print("Stored Procedure executed for cdc_log_position=0.")

            # Query and load the table into a DataFrame
            cursor.execute("SELECT * FROM PRODUCT_STATUS_HST")
            data_cdc_4 = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df_cdc_4 = pd.DataFrame(data_cdc_4, columns=columns)
            print("DataFrame for cdc_log_position=4:")
            print(df_cdc_4)

        except snowflake.connector.errors.ProgrammingError as e:
                print(f"Snowflake ProgrammingError: {e}")

        finally:
            cursor.close()    

except snowflake.connector.errors.DatabaseError as e:
    print(f"Snowflake DatabaseError: {e}")

print("Script execution completed.")

Using Database successfully.
Using Schema successfully.
Setting Role successful.
Checking if Stored Procedured Exists.
Stored procedure 'PROCESS_CDC_SCD' exists in 'CAIOCVELASCO.DATA_ENGINEER'.
Stored Procedure executed for cdc_log_position=0.
DataFrame for cdc_log_position=0:
   PRODUCT_STATUS_SCD_KEY  PRODUCT_STATUS_PRODUCT_KEY  PRODUCT_STATUS_STATUS  \
0                       1                           1                     10   
1                       2                           1                     20   
2                       3                           1                     10   

       SCD_START_TIME         SCD_END_TIME  
0 2019-01-01 10:00:00  2019-01-01 11:00:00  
1 2019-01-01 11:00:00  2019-01-01 12:00:00  
2 2019-01-01 14:00:00  2999-01-01 00:00:00  
Stored Procedure executed for cdc_log_position=0.
DataFrame for cdc_log_position=4:
   PRODUCT_STATUS_SCD_KEY  PRODUCT_STATUS_PRODUCT_KEY  PRODUCT_STATUS_STATUS  \
0                       1                           1    

### Breaking Down the SCD Type 2 Implementation

In [23]:
with snowflake.connector.connect(
    user=snowflake_user,
    password=snowflake_password,
    account=snowflake_account,
    warehouse=snowflake_warehouse,
    database=snowflake_database,
    schema=snowflake_schema
) as conn:
    cursor = conn.cursor()
    
    try:
        # 1. Execute deduplicated_cdc CTE
        deduplicated_cdc_query = f"""
        SELECT DISTINCT
            product_key,
            status,
            json_extract_path_text(change_type, 'type') AS change_type,
            change_time,
            cdc_log_position
        FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()}
        WHERE change_time IS NOT NULL;
        """
        cursor.execute(deduplicated_cdc_query)
        data_deduplicated_cdc = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df_deduplicated_cdc = pd.DataFrame(data_deduplicated_cdc, columns=columns)
        print("DataFrame for deduplicated_cdc:")
        print(df_deduplicated_cdc)
        
        # 2. Execute hash_table_1 CTE
        hash_table_1_query = """
        WITH 
        deduplicated_cdc AS (
            SELECT DISTINCT
                product_key,
                status,
                json_extract_path_text(change_type, 'type') AS change_type,
                change_time,
                cdc_log_position
            FROM CAIOCVELASCO.DATA_ENGINEER.PRODUCT_STATUS_CDC
            WHERE change_time IS NOT NULL
        ),
        
        hash_table_1 AS (
            SELECT *,
                CASE 
                    WHEN change_type != 'DELETE' THEN HASH(product_key, status)
                    ELSE HASH(product_key, status, change_time, change_type)
                END AS _hash_1,
                LAG(_hash_1) OVER (PARTITION BY product_key ORDER BY change_time) AS _previous_hash_1,
                COALESCE(_hash_1 != _previous_hash_1, TRUE) AS _valued_changed_1
            FROM deduplicated_cdc
        )
        select * from hash_table_1;
        """
        cursor.execute(hash_table_1_query)
        data_hash_table_1 = cursor.fetchall()
        columns = [desc[0] for desc in cursor.description]
        df_hash_table_1 = pd.DataFrame(data_hash_table_1, columns=columns)
        print("DataFrame for hash_table_1:")
        print(df_hash_table_1)

        # # 3. Execute hash_table_2 CTE
        # hash_table_2_query = """
        # SELECT * 
        # FROM hash_table_1 
        # WHERE _valued_changed_1 = 'TRUE'
        # """
        # cursor.execute(hash_table_2_query)
        # data_hash_table_2 = cursor.fetchall()
        # columns = [desc[0] for desc in cursor.description]
        # df_hash_table_2 = pd.DataFrame(data_hash_table_2, columns=columns)
        # print("DataFrame for hash_table_2:")
        # print(df_hash_table_2)

        # # 4. Execute final_table CTE
        # final_table_query = """
        # SELECT ht2.*,
        #     LAG(status) OVER (PARTITION BY product_key ORDER BY change_time) AS _previous_status,
        #     CASE 
        #         WHEN ht2.change_type = 'DELETE' AND ht2.status = _previous_status THEN FALSE
        #         ELSE TRUE
        #     END AS is_active
        # FROM hash_table_2 AS ht2
        # """
        # cursor.execute(final_table_query)
        # data_final_table = cursor.fetchall()
        # columns = [desc[0] for desc in cursor.description]
        # df_final_table = pd.DataFrame(data_final_table, columns=columns)
        # print("DataFrame for final_table:")
        # print(df_final_table)

        # # 5. Execute scd_table CTE
        # scd_table_query = """
        # SELECT 
        #     product_key AS product_status_product_key,
        #     status AS product_status_status,
        #     CAST(change_time AS TIMESTAMP_NTZ) AS scd_start_time,
        #     CASE 
        #         WHEN is_active = FALSE THEN CAST(change_time AS TIMESTAMP_NTZ)
        #         ELSE CAST(
        #             COALESCE(
        #                 LEAD(change_time) OVER (PARTITION BY product_key ORDER BY change_time),
        #                 TIMESTAMP '2999-01-01 00:00:00'
        #             ) AS TIMESTAMP_NTZ
        #         )
        #     END AS scd_end_time,
        #     is_active,
        #     cdc_log_position
        # FROM final_table
        # """
        # cursor.execute(scd_table_query)
        # data_scd_table = cursor.fetchall()
        # columns = [desc[0] for desc in cursor.description]
        # df_scd_table = pd.DataFrame(data_scd_table, columns=columns)
        # print("DataFrame for scd_table:")
        # print(df_scd_table)

        # # 6. Final query
        # final_query = """
        # SELECT 
        #     ROW_NUMBER() OVER (PARTITION BY product_status_product_key ORDER BY scd_start_time) AS product_status_scd_key,
        #     product_status_product_key,
        #     product_status_status,
        #     scd_start_time,
        #     scd_end_time,
        #     cdc_log_position
        # FROM scd_table
        # WHERE is_active = TRUE
        # """
        # cursor.execute(final_query)
        # data_final = cursor.fetchall()
        # columns = [desc[0] for desc in cursor.description]
        # df_final = pd.DataFrame(data_final, columns=columns)
        # print("Final DataFrame:")
        # print(df_final)
        
    except snowflake.connector.errors.ProgrammingError as e:
        print(f"Snowflake ProgrammingError: {e}")

    finally:
        cursor.close()


DataFrame for deduplicated_cdc:
   PRODUCT_KEY  STATUS CHANGE_TYPE         CHANGE_TIME  CDC_LOG_POSITION
0            1      10      INSERT 2019-01-01 10:00:00                 1
1            1      10      UPDATE 2019-01-01 10:30:00                 2
2            1      20      UPDATE 2019-01-01 11:00:00                 3
3            1      20      DELETE 2019-01-01 12:00:00                 4
4            1      10      INSERT 2019-01-01 14:00:00                 5
DataFrame for hash_table_1:
   PRODUCT_KEY  STATUS CHANGE_TYPE         CHANGE_TIME  CDC_LOG_POSITION  \
0            1      10      INSERT 2019-01-01 10:00:00                 1   
1            1      10      UPDATE 2019-01-01 10:30:00                 2   
2            1      20      UPDATE 2019-01-01 11:00:00                 3   
3            1      20      DELETE 2019-01-01 12:00:00                 4   
4            1      10      INSERT 2019-01-01 14:00:00                 5   

               _HASH_1  _PREVIOUS_HASH_1  _VA

### Query

* Find distinct Products for a given state and date

In [10]:
# Define query
query_0 = f"""
SELECT DISTINCT product_status_product_key AS product_key
FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()}
WHERE product_status_status = '10'  -- Replace '10' with the actual state you want to test
  AND scd_start_time <= CAST('2019-01-01' AS TIMESTAMP_NTZ) + INTERVAL '1 DAY' - INTERVAL '1 SECOND'  -- Replace '2024-10-29' with your desired test date
  AND scd_end_time >= CAST('2019-01-01' AS TIMESTAMP_NTZ);  -- Replace '2024-10-29' with the same date as above
"""

### Data Quality Checks

In [11]:
# 1. Check for Existence of Product Keys
# Ensure that all unique product keys in the CDC table exist in the SCD table.
# Outcome: If any product keys are returned, it indicates that those keys are missing in the SCD table.
query_1 = f"""
SELECT DISTINCT cdc.product_key
FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()} AS cdc
LEFT JOIN {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} AS scd
ON cdc.product_key = scd.product_status_product_key
WHERE scd.product_status_product_key IS NULL;
"""

# 2. Check for Duplicates in SCD
# Ensure that there are no duplicate records for the same product key in the SCD table.
# Outcome: If any rows are returned, it indicates that there are duplicates for those product keys in the SCD table.
query_2 = f"""
SELECT product_status_scd_key, product_status_product_key, product_status_status, scd_start_time, scd_end_time, COUNT(*) AS duplicate_count
FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()}
GROUP BY product_status_scd_key, product_status_product_key, product_status_status, scd_start_time, scd_end_time
HAVING COUNT(*) > 1;
"""

# 3. Check for Active Records
# Verify that all active records in the SCD table (where scd_end_time is a future date, e.g., 2999-01-01) 
# correspond to entries in the CDC table. This ensures that the current status of a product is accurately represented.
# Outcome: If there are any active records in the SCD table that do not have a matching product key in the CDC table, they will be returned, indicating a potential inconsistency in the data.
query_3 = f"""
SELECT DISTINCT s.product_status_product_key
FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} s
LEFT JOIN {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()} c
ON s.product_status_product_key = c.product_key
WHERE s.scd_end_time = '2999-01-01 00:00:00' -- Active records
  AND c.product_key IS NULL;
"""

# 4. Last Status Consistency Check
# For each product key in the SCD, check if the status reflects the latest status from the CDC.
# This ensures that updates in the CDC are accurately reflected in the SCD.
query_4 = f"""
WITH latest_cdc AS (
    SELECT 
        product_key,
        status AS latest_status,
        ROW_NUMBER() OVER (PARTITION BY product_key ORDER BY change_time DESC) AS rn
    FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()}
),

latest_scd AS (
    SELECT 
        product_status_product_key,
        product_status_status,
        ROW_NUMBER() OVER (PARTITION BY product_status_product_key ORDER BY scd_start_time DESC) AS rn
    FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()}
    WHERE scd_end_time = '2999-01-01 00:00:00'
)

SELECT 
    c.product_key,
    c.latest_status AS cdc_latest_status,
    s.product_status_status AS scd_latest_status
FROM latest_cdc c
LEFT JOIN latest_scd s ON c.product_key = s.product_status_product_key
WHERE c.rn = 1 
    AND (s.rn <> 1 OR s.product_status_status IS NULL OR s.product_status_status <> c.latest_status);
"""

# 5. Consistency Check
# This query ensures that the change types (INSERT, UPDATE, DELETE) in the CDC align with the records in the SCD.
query_5 = f"""
WITH deduplicated_cdc AS (
    SELECT 
        product_key,
        status,
        json_extract_path_text(change_type, 'type') AS change_type,
        change_time
    FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_cdc.upper()}
    WHERE change_time IS NOT NULL
    GROUP BY product_key, status, change_type, change_time
),

insert_check AS (
    SELECT 
        cdc.product_key,
        cdc.status,
        cdc.change_type,
        cdc.change_time
    FROM deduplicated_cdc cdc
    LEFT JOIN {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} scd
        ON cdc.product_key = scd.product_status_product_key
        AND cdc.status = scd.product_status_status
        AND scd.scd_end_time = '2999-01-01 00:00:00'
    WHERE cdc.change_type = 'INSERT'
    AND scd.product_status_product_key IS NULL
),

update_check AS (
    SELECT 
        cdc.product_key,
        cdc.status,
        cdc.change_type,
        cdc.change_time
    FROM deduplicated_cdc cdc
    LEFT JOIN {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} scd
        ON cdc.product_key = scd.product_status_product_key
        AND cdc.status = scd.product_status_status
    WHERE cdc.change_type = 'UPDATE'
    AND scd.product_status_product_key IS NULL
),

delete_check AS (
    SELECT 
        cdc.product_key,
        cdc.status,
        cdc.change_time
    FROM deduplicated_cdc cdc
    LEFT JOIN {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()} scd
        ON cdc.product_key = scd.product_status_product_key
    WHERE cdc.change_type = 'DELETE'
    GROUP BY cdc.product_key, cdc.status, cdc.change_time
    HAVING COUNT(scd.product_status_product_key) = 1
)

SELECT 
    'INSERT' AS check_type,
    COUNT(*) AS total_issues
FROM insert_check
UNION ALL
SELECT 
    'UPDATE' AS check_type,
    COUNT(*) AS total_issues
FROM update_check
UNION ALL
SELECT 
    'DELETE' AS check_type,
    COUNT(*) AS total_issues
FROM delete_check;
"""

# 6. Time Range Validation
# This check calculates the end time of one record and compares it to the start time of the next record for each product key. If the end time exceeds the next start time, it indicates overlapping records.
# Outcome: No output means that there is no overlapping records.
query_6 = f"""
WITH time_ranges AS (
    SELECT 
        product_status_product_key,
        scd_start_time,
        scd_end_time,
        LEAD(scd_start_time) OVER (PARTITION BY product_status_product_key ORDER BY scd_start_time) AS next_start_time
    FROM {snowflake_database.upper()}.{snowflake_schema.upper()}.{snowflake_product_status_hst.upper()}
)

SELECT 
    product_status_product_key,
    COUNT(*) AS overlapping_records
FROM time_ranges
WHERE scd_end_time > next_start_time
GROUP BY product_status_product_key
HAVING COUNT(*) > 0;
"""

In [14]:
try:
    # Snowflake connection context manager
    with snowflake.connector.connect(
        user=snowflake_user,
        password=snowflake_password,
        account=snowflake_account,
        warehouse=snowflake_warehouse,
        database=snowflake_database,
        schema=snowflake_schema
    ) as conn:
        
        # Execute operations within the context manager
        cursor = conn.cursor()

        try:            
            cursor.execute(use_snowflake_database)  # Use Database (Set it for the user session)
            print("Using Database successfully.")
            
            # Query 1 Execution and DataFrame Conversion
            cursor.execute(query_1)
            data_1 = cursor.fetchall()
            columns_1 = [desc[0] for desc in cursor.description]
            df_query_1 = pd.DataFrame(data_1, columns=columns_1)
            print("DataFrame for Query 1:")
            print(df_query_1)

            # Query 2 Execution and DataFrame Conversion
            cursor.execute(query_2)
            data_2 = cursor.fetchall()
            columns_2 = [desc[0] for desc in cursor.description]
            df_query_2 = pd.DataFrame(data_2, columns=columns_2)
            print("DataFrame for Query 2:")
            print(df_query_2)

            # Query 3 Execution and DataFrame Conversion
            cursor.execute(query_3)
            data_3 = cursor.fetchall()
            columns_3 = [desc[0] for desc in cursor.description]
            df_query_3 = pd.DataFrame(data_3, columns=columns_3)
            print("DataFrame for Query 3:")
            print(df_query_3)

            # Query 4 Execution and DataFrame Conversion
            cursor.execute(query_4)
            data_4 = cursor.fetchall()
            columns_4 = [desc[0] for desc in cursor.description]
            df_query_4 = pd.DataFrame(data_4, columns=columns_4)
            print("DataFrame for Query 4:")
            print(df_query_4)

            # Query 5 Execution and DataFrame Conversion
            cursor.execute(query_5)
            data_5 = cursor.fetchall()
            columns_5 = [desc[0] for desc in cursor.description]
            df_query_5 = pd.DataFrame(data_5, columns=columns_5)
            print("DataFrame for Query 5:")
            print(df_query_5)

            # Query 6 Execution and DataFrame Conversion
            cursor.execute(query_6)
            data_6 = cursor.fetchall()
            columns_6 = [desc[0] for desc in cursor.description]
            df_query_6 = pd.DataFrame(data_6, columns=columns_6)
            print("DataFrame for Query 6:")
            print(df_query_6)

        except snowflake.connector.errors.ProgrammingError as e:
            print(f"Snowflake ProgrammingError: {e}")

        finally:
            cursor.close()    

except snowflake.connector.errors.DatabaseError as e:
    print(f"Snowflake DatabaseError: {e}")

print("Script execution completed.")

Using Database successfully.
DataFrame for Query 1:
Empty DataFrame
Columns: [PRODUCT_KEY]
Index: []
DataFrame for Query 2:
Empty DataFrame
Columns: [PRODUCT_STATUS_SCD_KEY, PRODUCT_STATUS_PRODUCT_KEY, PRODUCT_STATUS_STATUS, SCD_START_TIME, SCD_END_TIME, DUPLICATE_COUNT]
Index: []
DataFrame for Query 3:
Empty DataFrame
Columns: [PRODUCT_STATUS_PRODUCT_KEY]
Index: []
DataFrame for Query 4:
Empty DataFrame
Columns: [PRODUCT_KEY, CDC_LATEST_STATUS, SCD_LATEST_STATUS]
Index: []
DataFrame for Query 5:
  CHECK_TYPE  TOTAL_ISSUES
0     INSERT             0
1     UPDATE             0
2     DELETE             0
DataFrame for Query 6:
Empty DataFrame
Columns: [PRODUCT_STATUS_PRODUCT_KEY, OVERLAPPING_RECORDS]
Index: []
Script execution completed.
