Skip to content

Commit

Permalink
DBZ-6858 Add suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ani-sha authored and jpechane committed Feb 29, 2024
1 parent 7feeed8 commit cb81f75
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void closeWindow(Partition partition, OffsetContext offsetContext, String
LOGGER.trace("Emitting close window for chunk = '{}'", chunkId);
x.setString(1, chunkId + "-close");
x.setString(2, CloseIncrementalSnapshotWindow.NAME);
x.setString(3, signalMetadata.closeWindowSignalMetadataString());
x.setString(3, signalMetadata.metadataString());
});

jdbcConnection.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public void processMessage(Partition partition, DataCollectionId dataCollectionI

@Override
protected void emitWindowOpen() throws SQLException {
signalMetadata = new SignalMetadata(Instant.now().toString(), null);
signalMetadata = new SignalMetadata(Instant.now(), null);
jdbcConnection.prepareUpdate(signalWindowStatement, x -> {
LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId());
x.setString(1, context.currentChunkId() + "-open");
x.setString(2, OpenIncrementalSnapshotWindow.NAME);
x.setString(3, signalMetadata.openWindowSignalMetadataString());
x.setString(3, signalMetadata.metadataString());
});
jdbcConnection.commit();
}
Expand All @@ -93,6 +93,6 @@ private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig con
return new DeleteWindowCloser<>(jdbcConnection, signalTable, this);
}

return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(signalMetadata.getOpenWindowTimestamp(), Instant.now().toString()));
return new InsertWindowCloser(jdbcConnection, signalTable, new SignalMetadata(signalMetadata.getOpenWindowTimestamp(), Instant.now()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@
*/
package io.debezium.pipeline.source.snapshot.incremental;

import java.time.Instant;

/**
* Signal metadata for the incremental snapshotting
*
* @author Anisha Mohanty
*/
public class SignalMetadata {
private final String openWindowTimestamp;
private final String closeWindowTimestamp;
private final Instant openWindowTimestamp;
private final Instant closeWindowTimestamp;

public SignalMetadata(String openWindowTimestamp, String closeWindowTimestamp) {
public SignalMetadata(Instant openWindowTimestamp, Instant closeWindowTimestamp) {
this.openWindowTimestamp = openWindowTimestamp;
this.closeWindowTimestamp = closeWindowTimestamp;
}

public String openWindowSignalMetadataString() {
return String.format("{\"openWindowTimestamp\": \"%s\"}", openWindowTimestamp);
}

public String closeWindowSignalMetadataString() {
public String metadataString() {
if (closeWindowTimestamp == null) {
return String.format("{\"openWindowTimestamp\": \"%s\"}", openWindowTimestamp);
}
return String.format("{\"openWindowTimestamp\": \"%s\", \"closeWindowTimestamp\": \"%s\"}", openWindowTimestamp, closeWindowTimestamp);
}

public String getOpenWindowTimestamp() {
public Instant getOpenWindowTimestamp() {
return openWindowTimestamp;
}

public String getCloseWindowTimestamp() {
public Instant getCloseWindowTimestamp() {
return closeWindowTimestamp;
}
}

0 comments on commit cb81f75

Please sign in to comment.