In [1]:
import duckdb
import pandas as pd

%load_ext sql
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False
%sql duckdb:///:memory:

In [2]:
%%sql

INSTALL httpfs;
LOAD httpfs;
SET s3_endpoint='minio:9000';
SET s3_region='us-east-1';
SET s3_access_key_id='minio';
SET s3_secret_access_key='minio123';
SET s3_url_style=path;
SET s3_use_ssl=false;

In [None]:
%%sql
WITH products_create_update_delete AS( 
    SELECT
        COALESCE(CAST(json->'value'->'after'->'id' AS INT), CAST(json->'value'->'before'->'id' AS INT)) AS id,
        json->'value'->'before' AS before_row_value,
        json->'value'->'after' AS after_row_value,
        CASE
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"c"' THEN 'CREATE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"d"' THEN 'DELETE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"u"' THEN 'UPDATE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"r"' THEN 'SNAPSHOT'
            ELSE 'INVALID'
        END AS operation_type,
        CAST(json->'value'->'source'->'lsn' AS BIGINT) AS log_seq_num,
    epoch_ms(CAST(json->'value'->'source'->'ts_ms' AS BIGINT)) AS source_timestamp
    FROM read_ndjson_objects('s3://commerce/debezium.commerce.products/*/*/*.json')
    WHERE log_seq_num IS NOT NULL
)
SELECT 
id,
CAST(after_row_value->'name' AS VARCHAR(255)) AS name,
CAST(after_row_value->'description' AS TEXT) AS description,
CAST(after_row_value->'price' AS NUMERIC(10, 2)) AS price,
source_timestamp AS row_valid_start_timestamp,
CASE 
    WHEN LEAD(source_timestamp, 1) OVER lead_txn_timestamp IS NULL THEN CAST('9999-01-01' AS TIMESTAMP) 
    ELSE LEAD(source_timestamp, 1) OVER lead_txn_timestamp 
END AS row_valid_expiration_timestamp
FROM products_create_update_delete
WHERE id in (SELECT id FROM products_create_update_delete GROUP BY id HAVING COUNT(*) > 1)
WINDOW lead_txn_timestamp AS (PARTITION BY id ORDER BY log_seq_num )
ORDER BY id, row_valid_start_timestamp
LIMIT 200;

In [16]:
%%sql

WITH products_create_update_delete AS( 
    SELECT
        COALESCE(CAST(json->'value'->'after'->'id' AS INT), CAST(json->'value'->'before'->'id' AS INT)) AS id,
        json->'value'->'before' AS before_row_value,
        json->'value'->'after' AS after_row_value,
        CASE
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"c"' THEN 'CREATE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"d"' THEN 'DELETE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"u"' THEN 'UPDATE'
            WHEN CAST(json->'value'->'$.op' AS CHAR(1)) = '"r"' THEN 'SNAPSHOT'
            ELSE 'INVALID'
        END AS operation_type,
        CAST(json->'value'->'source'->'lsn' AS BIGINT) AS log_seq_num,
    epoch_ms(CAST(json->'value'->'source'->'ts_ms' AS BIGINT)) AS source_timestamp
    FROM read_ndjson_objects('s3://commerce/debezium.commerce.products/*/*/*.json')
    -- WHERE log_seq_num IS NOT NULL
), 
scd2 AS (
    SELECT 
        id,
        CAST(after_row_value->'name' AS VARCHAR(255)) AS name,
        CAST(after_row_value->'description' AS TEXT) AS description,
        CAST(after_row_value->'price' AS NUMERIC(10, 2)) AS price,
        source_timestamp AS row_valid_start_timestamp,
        LEAD(source_timestamp, 1) OVER lead_txn_timestamp AS row_valid_expiration_timestamp
    FROM products_create_update_delete
    WINDOW lead_txn_timestamp AS (PARTITION BY id ORDER BY log_seq_num )
    ORDER BY id, row_valid_start_timestamp
)
SELECT 
    id,
    name,
    description,
    price
FROM scd2
WHERE name IS NOT NULL AND row_valid_expiration_timestamp IS NULL

Unnamed: 0,id,name,description,price
0,0,"""Troy Collins""","""It look couple wonder top arm huge try. Ball ...",91.00
1,1,"""Donna Smith""","""Senior give condition. Moment single vote eve...",93.00
2,2,"""Jennifer Wilson""","""Receive stuff medical station pattern. Relati...",41.00
3,3,"""Beth Ingram""","""Benefit agent lose good recently political me...",100.00
4,4,"""Marcus Michael""","""Arrive military serious character water. Deve...",42.00
...,...,...,...,...
331,347,"""Patricia Day""","""Month southern smile current. Though page few...",83.00
332,348,"""Cindy Mejia""","""Job allow road light. Main three fly science ...",40.00
333,349,"""Mark Flores""","""Difference worry artist baby that provide str...",55.00
334,350,"""Rebecca Clark""","""Particularly finally quality politics. Left m...",84.00
