From cb81f75481a61cb72a2ce5f9bf6286374843e120 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Thu, 22 Feb 2024 17:36:24 +0530 Subject: [PATCH] DBZ-6858 Add suggestions from code review --- .../incremental/InsertWindowCloser.java | 2 +- ...dIncrementalSnapshotChangeEventSource.java | 6 ++--- .../snapshot/incremental/SignalMetadata.java | 26 ++++++++++++------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/InsertWindowCloser.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/InsertWindowCloser.java index 438706e4a00..62f97697b0d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/InsertWindowCloser.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/InsertWindowCloser.java @@ -37,7 +37,7 @@ public void closeWindow(Partition partition, OffsetContext offsetContext, String LOGGER.trace("Emitting close window for chunk = '{}'", chunkId); x.setString(1, chunkId + "-close"); x.setString(2, CloseIncrementalSnapshotWindow.NAME); - x.setString(3, signalMetadata.closeWindowSignalMetadataString()); + x.setString(3, signalMetadata.metadataString()); }); jdbcConnection.commit(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java index 4b697265812..63ed9d3b8aa 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.java @@ -67,12 +67,12 @@ public void processMessage(Partition partition, DataCollectionId dataCollectionI @Override protected void emitWindowOpen() throws SQLException { - signalMetadata = new SignalMetadata(Instant.now().toString(), null); + signalMetadata = new SignalMetadata(Instant.now(), null); jdbcConnection.prepareUpdate(signalWindowStatement, x -> { LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId()); x.setString(1, context.currentChunkId() + "-open"); x.setString(2, OpenIncrementalSnapshotWindow.NAME); - x.setString(3, signalMetadata.openWindowSignalMetadataString()); + x.setString(3, signalMetadata.metadataString()); }); jdbcConnection.commit(); } @@ -93,6 +93,6 @@ private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig con return new DeleteWindowCloser<>(jdbcConnection, signalTable, this); } - return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(signalMetadata.getOpenWindowTimestamp(), Instant.now().toString())); + return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(signalMetadata.getOpenWindowTimestamp(), Instant.now())); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalMetadata.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalMetadata.java index 0c6e50c5f0c..c3373416327 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalMetadata.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/SignalMetadata.java @@ -5,28 +5,34 @@ */ package io.debezium.pipeline.source.snapshot.incremental; +import java.time.Instant; + +/** + * Signal metadata for the incremental snapshotting + * + * @author Anisha Mohanty + */ public class SignalMetadata { - private final String openWindowTimestamp; - private final String closeWindowTimestamp; + private final Instant openWindowTimestamp; + private final Instant closeWindowTimestamp; - public SignalMetadata(String openWindowTimestamp, String closeWindowTimestamp) { + public SignalMetadata(Instant openWindowTimestamp, Instant closeWindowTimestamp) { this.openWindowTimestamp = openWindowTimestamp; this.closeWindowTimestamp = closeWindowTimestamp; } - public String openWindowSignalMetadataString() { - return String.format("{\"openWindowTimestamp\": \"%s\"}", openWindowTimestamp); - } - - public String closeWindowSignalMetadataString() { + public String metadataString() { + if (closeWindowTimestamp == null) { + return String.format("{\"openWindowTimestamp\": \"%s\"}", openWindowTimestamp); + } return String.format("{\"openWindowTimestamp\": \"%s\", \"closeWindowTimestamp\": \"%s\"}", openWindowTimestamp, closeWindowTimestamp); } - public String getOpenWindowTimestamp() { + public Instant getOpenWindowTimestamp() { return openWindowTimestamp; } - public String getCloseWindowTimestamp() { + public Instant getCloseWindowTimestamp() { return closeWindowTimestamp; } }