<h2 style="color:green; font-size: 40px;" align="center">Snowflake Continuous Data Capture (CDC) Setup Notebook<h2>
<h2 style="color:grey" align="center">Setup CDC schema and tables<h2>

```sql

-- Create the CDC schema in the RAW database
CREATE OR REPLACE SCHEMA raw.cdc;

-- Create the staging table with the latest active data
CREATE OR REPLACE TABLE raw.cdc.stg_active_nation (
    N_NATIONKEY NUMBER,               -- Nation key (numeric)
    N_NAME STRING,                    -- Nation name (string)
    N_REGIONKEY NUMBER,               -- Region key (numeric)
    N_COMMENT STRING,                 -- Comment (string)
    CREATED_TIME TIMESTAMP_NTZ,      
    MODIFIED_TIME TIMESTAMP_NTZ        
);

-- Create a table to hold all historic changes
CREATE OR REPLACE TABLE raw.cdc.nation_historic_changes (
    N_NATIONKEY NUMBER,               -- Nation key (numeric)
    N_NAME STRING,                    -- Nation name (string)
    N_REGIONKEY NUMBER,               -- Region key (numeric)
    N_COMMENT STRING,                 -- Comment (string)
    CREATED_TIME TIMESTAMP_NTZ,       -- Created time with no time zone
    MODIFIED_TIME TIMESTAMP_NTZ,      -- Modified time with no time zone
    CHANGE_TIME TIMESTAMP_NTZ,        -- Time of the change
    METADATA$ACTION STRING,           -- Metadata for type of action (INSERT, UPDATE, DELETE)
    METADATA$ISUPDATE STRING,         -- Flag if this record was an update
    METADATA$ROW_ID STRING            -- Metadata for row identity
);

<h2 style="color:grey" align="center">Change Detection - Create Stream on Staging Table <h2>

```sql

-- Create a stream to monitor changes in the STG_Active table
CREATE OR REPLACE STREAM raw.cdc.stg_active_nation_stream ON TABLE raw.cdc.stg_active_nation;


<h2 style="color:grey" align="center">Change Detection - Data Merging <h2>

```sql

-- Create the procedure to merge data
CREATE OR REPLACE PROCEDURE raw.cdc.merge_to_stg_active_nation_proc()
RETURNS VARCHAR(16777216)
LANGUAGE SQL
EXECUTE AS CALLER
AS ' begin

MERGE INTO raw.cdc.stg_active_nation AS target
USING (
    SELECT N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT,
           CURRENT_TIMESTAMP() AS modified_time
    FROM raw.source_1.nation
) AS source
ON target.N_NATIONKEY = source.N_NATIONKEY
WHEN MATCHED AND (
    target.N_NAME IS DISTINCT FROM source.N_NAME OR
    target.N_REGIONKEY IS DISTINCT FROM source.N_REGIONKEY OR
    target.N_COMMENT IS DISTINCT FROM source.N_COMMENT
)
THEN UPDATE SET
    target.N_NAME = source.N_NAME,
    target.N_REGIONKEY = source.N_REGIONKEY,
    target.N_COMMENT = source.N_COMMENT,
    target.modified_time = source.modified_time
WHEN NOT MATCHED
THEN INSERT (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT, created_time, modified_time)
VALUES (source.N_NATIONKEY, source.N_NAME, source.N_REGIONKEY, source.N_COMMENT, source.modified_time, source.modified_time)

;

delete from raw.cdc.stg_active_nation where N_NATIONKEY not in (select distinct  N_NATIONKEY from raw.source_1.nation)

;

return ''success'';
end; '


-- Create a task to apply the merge logic
CREATE OR REPLACE TASK raw.cdc.merge_to_stg_active_nation
WAREHOUSE = 'WAREHOUSE'
SCHEDULE ='USING CRON * * * * * America/New_York' --Adjust as required
AS call raw.cdc.merge_to_stg_active_nation_proc()


<h2 style="color:grey" align="center">Change Detection - Consume Stream <h2>

```sql

-- Create a task to insert stream data into the historic changes table
CREATE OR REPLACE TASK raw.cdc.insert_to_nation_historic_changes
WAREHOUSE = 'WAREHOUSE'
AFTER raw.cdc.merge_to_stg_active_nation
AS INSERT INTO raw.cdc.nation_historic_changes
SELECT 
    N_NATIONKEY, 
    N_NAME, 
    N_REGIONKEY, 
    N_COMMENT, 
    CREATED_TIME, 
    MODIFIED_TIME, 
    CURRENT_TIMESTAMP() AS CHANGE_TIME, 
    METADATA$ACTION, 
    METADATA$ISUPDATE,
    METADATA$ROW_ID
FROM raw.cdc.stg_active_nation_stream;


<h2 style="color:grey" align="center">Change Detection - Task Management <h2>

```sql

-- Start the merge task
ALTER TASK raw.cdc.merge_to_stg_active_nation RESUME;
EXECUTE TASK raw.cdc.merge_to_stg_active_nation;

-- Start the insert task
ALTER TASK raw.cdc.insert_to_nation_historic_changes RESUME;
EXECUTE TASK raw.cdc.insert_to_nation_historic_changes;



<h2 style="color:grey" align="center">Change Detection - Testing <h2>

```sql

SELECT * FROM RAW.CDC.NATION_HISTORIC_CHANGES ORDER BY CHANGE_TIME DESC, MODIFIED_TIME DESC
SELECT * FROM RAW.CDC.STG_ACTIVE_NATION
SELECT * FROM RAW.CDC.STG_ACTIVE_NATION_STREAM 

UPDATE RAW.SOURCE_1.NATION
SET
N_NAME = 'COLOMBIA'
WHERE N_NATIONKEY = 0

EXECUTE TASK raw.cdc.merge_to_stg_active_nation;
EXECUTE TASK raw.cdc.insert_to_nation_historic_changes;
SELECT * FROM RAW.CDC.NATION_HISTORIC_CHANGES ORDER BY CHANGE_TIME DESC, MODIFIED_TIME DESC



![Imagen not found](https://drive.google.com/uc?export=view&id=1OrYJ2ZaL1-Iz5_WHVOt--dyWkGm9YxSr)




