# BigQuery Streaming CDC Merge Tutorial

**License**

In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

**Create a BigQuery dataset**: This dataset will hold your tables.

In [None]:
!bq mk --location=US my_cdc_dataset

BigQuery error in mk operation: Dataset 'fsi-multi-cloud-demo:my_cdc_dataset' already exists.


**Create a staging table**: This table will receive the raw CDC data from Pub/Sub. Define the schema according to your CDC data format. For example:

In [None]:
%%bigquery
CREATE OR REPLACE TABLE my_cdc_dataset.staging_table (
    id STRING,
    name STRING,
    value STRING,
    _metadata_timestamp TIMESTAMP,  -- Timestamp from Pub/Sub
    _event_timestamp TIMESTAMP,      -- Timestamp of the event
    _op STRING                     -- Operation: I, U, D
)
PARTITION BY _event_timestamp

Executing query with job ID: 01c773db-9012-4783-ac35-7bcd82fab377
Query executing: 0.45s


ERROR:
 400 PARTITION BY expression must be _PARTITIONDATE, DATE(_PARTITIONTIME), DATE(<timestamp_column>), DATE(<datetime_column>), DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR), a DATE column, TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR), DATE_TRUNC(<date_column>, MONTH/YEAR), or RANGE_BUCKET(<int64_column>, GENERATE_ARRAY(<int64_value>, <int64_value>[, <int64_value>])); reason: invalidQuery, location: query, message: PARTITION BY expression must be _PARTITIONDATE, DATE(_PARTITIONTIME), DATE(<timestamp_column>), DATE(<datetime_column>), DATETIME_TRUNC(<datetime_column>, DAY/HOUR/MONTH/YEAR), a DATE column, TIMESTAMP_TRUNC(<timestamp_column>, DAY/HOUR/MONTH/YEAR), DATE_TRUNC(<date_column>, MONTH/YEAR), or RANGE_BUCKET(<int64_column>, GENERATE_ARRAY(<int64_value>, <int64_value>[, <int64_value>]))

Location: US
Job ID: 01c773db-9012-4783-ac35-7bcd82fab377



**Create a curated table**: This table will hold the merged and cleaned data. It should have a similar schema to the staging table, but without the metadata columns.

In [None]:
%%bigquery
CREATE OR REPLACE TABLE my_cdc_dataset.curated_table (
    id STRING,
    name STRING,
    value STRING,
    event_timestamp TIMESTAMP
);

Query is running:   0%|          |

**Insert sample records**: Add sample records with insert, update and delete operations

In [None]:
%%bigquery
-- Inserts
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('1', 'Product A', '10.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 10 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 10 MINUTE), 'I');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('2', 'Product B', '20.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 9 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 9 MINUTE), 'I');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('3', 'Product C', '30.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 8 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 8 MINUTE), 'I');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('4', 'Product D', '40.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 MINUTE), 'I');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('5', 'Product E', '50.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 MINUTE), 'I');
-- ... (More inserts up to around 50)

-- Updates for existing IDs
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('1', 'Product A Updated', '12.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 5 MINUTE), 'U');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('3', 'Product C Updated', '35.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 MINUTE), 'U');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('5', 'Product E Updated', '55.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 MINUTE), 'U');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('2', 'Product B v2', '22.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 MINUTE), 'U');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('4', 'Product D v2', '42.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 MINUTE), 'U');

-- ... (More updates for various IDs)

-- Deletes for existing IDs
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('2', NULL, NULL, TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'D');  -- Delete Product B
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('4', NULL, NULL, TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'D');  -- Delete Product D
-- ... (More deletes)

-- More Inserts to show how the merge handles new data after deletes
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('6', 'Product F', '60.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'I');
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('2', 'Product B - Reborn', '25.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'I'); -- Product B re-inserted
INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('4', 'Product D - Reborn', '45.00', TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'I'); -- Product D re-inserted

INSERT INTO my_cdc_dataset.staging_table (id, name, value, _metadata_timestamp, _event_timestamp, _op) VALUES ('6', NULL, NULL, TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 0 MINUTE), 'D');  -- Delete Product D

-- ... (Fill up to 100 statements with a mix of inserts, updates, and deletes)

Query is running:   0%|          |

**[Option #1] Create the Latest Version View**: This view shows only the latest record for each id based on the event_timestamp.

In [None]:
%%bigquery
CREATE OR REPLACE VIEW my_cdc_dataset.latest_records AS
SELECT *
FROM my_cdc_dataset.staging_table
WHERE _event_timestamp = (
    SELECT MAX(_event_timestamp)
    FROM my_cdc_dataset.staging_table AS t2
    WHERE t2.id = staging_table.id
)
AND _op NOT IN ('D')
;

Query is running:   0%|          |

In [None]:
%%bigquery
SELECT * FROM my_cdc_dataset.latest_records;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,id,name,value,_metadata_timestamp,_event_timestamp,_op
0,1,Product A Updated,12.0,2025-02-11 22:51:12.366356+00:00,2025-02-11 22:51:12.366356+00:00,U
1,3,Product C Updated,35.0,2025-02-11 22:52:14.453944+00:00,2025-02-11 22:52:14.453944+00:00,U
2,5,Product E Updated,55.0,2025-02-11 22:53:16.112650+00:00,2025-02-11 22:53:16.112650+00:00,U
3,4,Product D - Reborn,45.0,2025-02-11 22:57:00.887831+00:00,2025-02-11 22:57:00.887831+00:00,I
4,2,Product B - Reborn,25.0,2025-02-11 22:56:58.884720+00:00,2025-02-11 22:56:58.884720+00:00,I


**[Option 2] Scheduled Merge Job**: You can schedule a merge job using BigQuery scheduler.  This example uses a scheduled query.

In [None]:
%%bigquery
MERGE my_cdc_dataset.curated_table AS target
USING (
    SELECT id, name, value, _event_timestamp, _op
    FROM my_cdc_dataset.staging_table
    WHERE _event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR) -- Process only recent changes from last 24 hours
    QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY _event_timestamp DESC) = 1 -- Get latest record for each ID
) AS source
ON target.id = source.id  -- Still join on ID for matching
WHEN MATCHED AND source._op = 'U' AND source._event_timestamp > target.event_timestamp THEN UPDATE SET target.name = source.name, target.value = source.value, target.event_timestamp = source._event_timestamp -- Only update if source timestamp is newer
WHEN MATCHED AND source._op = 'D' AND source._event_timestamp > target.event_timestamp THEN DELETE -- Only delete if source timestamp is newer
WHEN NOT MATCHED AND source._op IN ('I','U') THEN INSERT (id, name, value, event_timestamp) VALUES (source.id, source.name, source.value, source._event_timestamp); -- INSERT UNMATCHED INSERT AND UPDATE RECORDS, IGNORE UNMATCHED DELETES

Query is running:   0%|          |

In [None]:
%%bigquery
SELECT * FROM my_cdc_dataset.curated_table;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,id,name,value,event_timestamp
0,1,Product A Updated,12.0,2025-02-11 22:51:12.366356+00:00
1,4,Product D - Reborn,45.0,2025-02-11 22:57:00.887831+00:00
2,2,Product B - Reborn,25.0,2025-02-11 22:56:58.884720+00:00
3,3,Product C Updated,35.0,2025-02-11 22:52:14.453944+00:00
4,5,Product E Updated,55.0,2025-02-11 22:53:16.112650+00:00
