Skip to content

Commit

Permalink
DBZ-6605 Fix DataCollections for snapshot completion notification
Browse files Browse the repository at this point in the history
DBZ-6605 Use new 'scanned_collection' field for snapshot completion signal

Updated the docs for fixed snapshot completion notification

(cherry picked from commit 6f29ce0)
  • Loading branch information
bdbene authored and mfvitale committed Jun 29, 2023
1 parent b087132 commit 4b0c8a4
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions COPYRIGHT.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Aykut Farsak
Babur Duisenov
Balázs Németh
Balázs Sipos
Balint Bene
Barry LaFond
Bartosz Miedlar
Ben Hardesty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class IncrementalSnapshotNotificationService<P extends Partition, O exten

public static final String INCREMENTAL_SNAPSHOT = "Incremental Snapshot";
public static final String DATA_COLLECTIONS = "data_collections";
public static final String SCANNED_COLLECTION = "scanned_collection";
public static final String CURRENT_COLLECTION_IN_PROGRESS = "current_collection_in_progress";
public static final String MAXIMUM_KEY = "maximum_key";
public static final String LAST_PROCESSED_KEY = "last_processed_key";
Expand Down Expand Up @@ -108,13 +109,15 @@ public <T extends DataCollectionId> void notifyAborted(IncrementalSnapshotContex
public <T extends DataCollectionId> void notifyTableScanCompleted(IncrementalSnapshotContext<T> incrementalSnapshotContext, P partition, OffsetContext offsetContext,
long totalRowsScanned, TableScanCompletionStatus status) {

String scannedCollection = incrementalSnapshotContext.currentDataCollectionId().getId().identifier();
String dataCollections = incrementalSnapshotContext.getDataCollections().stream().map(DataCollection::getId)
.map(DataCollectionId::identifier)
.collect(Collectors.joining(LIST_DELIMITER));

notificationService.notify(buildNotificationWith(incrementalSnapshotContext, SnapshotStatus.TABLE_SCAN_COMPLETED,
Map.of(
DATA_COLLECTIONS, dataCollections,
SCANNED_COLLECTION, scannedCollection,
TOTAL_ROWS_SCANNED, String.valueOf(totalRowsScanned),
STATUS, status.name()),
offsetContext),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void notifyTableScanCompleted() {
Notification expectedNotification = new Notification("12345", "Incremental Snapshot", "TABLE_SCAN_COMPLETED", Map.of(
"connector_name", "connector-test",
"data_collections", "db.inventory.product,db.inventory.customer",
"scanned_collection", "db.inventory.product",
"total_rows_scanned", "100",
"status", "SUCCEEDED"));

Expand Down
11 changes: 6 additions & 5 deletions documentation/modules/ROOT/pages/configuration/notification.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ a|[source, json]
"type":"TABLE_SCAN_COMPLETED",
"additional_data":{
"connector_name":"my-connector",
"data_collection":"table1",
"data_collection":"table1, table2",
"scanned_collection":"table1",
"total_rows_scanned":"100",
"status":"SUCCEEDED" // <1>
}
Expand Down Expand Up @@ -224,7 +225,7 @@ To enable an application to listen for the JMX notifications that an MBean emits
[id="debezium-notification-custom-channel"]
== Custom notification channels

The notification mechanism is designed to be extensible.
The notification mechanism is designed to be extensible.
You can implement channels as needed to deliver notifications in a manner that works best in your environment.
Adding a notification channel involves several steps:

Expand Down Expand Up @@ -256,9 +257,9 @@ public interface NotificationChannel {
<1> The name of the channel.
To enable {prodname} to use the channel, specify this name in the connector's `notification.enabled.channels` property.
<2> Initializes specific configuration, variables, or connections that the channel requires.
<3> Sends the notification on the channel.
<3> Sends the notification on the channel.
{prodname} calls this method to report its status.
<4> Closes all allocated resources.
<4> Closes all allocated resources.
{prodname} calls this method when the connector is stopped.

// Type: concept
Expand Down Expand Up @@ -299,5 +300,5 @@ NOTE: To use a custom notification channel with multiple connectors, you must pl
[id="configuring-connectors-to-use-a-custom-notification-channel"]
=== Configuring connectors to use a custom notification channel

Add the name of the custom notification channel to the `notification.enabled.channels` configuration property.
Add the name of the custom notification channel to the `notification.enabled.channels` configuration property.

3 changes: 2 additions & 1 deletion jenkins-jobs/scripts/config/Aliases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,5 @@ gongchanghua,Gong Chang Hua
angsdey2,Angshuman Dey
jehrenzweig-pi,Jesse Ehrenzweig
TechIsCool,David Beck
cjmencias,Christian Jacob Mencias
cjmencias,Christian Jacob Mencias
bdbene,Balint Bene

0 comments on commit 4b0c8a4

Please sign in to comment.