Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6895 Fix OpenLogReplicator confirmation #4859

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,22 @@ public class OracleOffsetContext extends CommonOffsetContext<SourceInfo> {
*/
private boolean snapshotCompleted;

public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, CommitScn commitScn, String lcrPosition,
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Long scnIndex, CommitScn commitScn, String lcrPosition,
Scn snapshotScn, Map<String, Scn> snapshotPendingTransactions,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
this(connectorConfig, scn, lcrPosition, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext, incrementalSnapshotContext);
this(connectorConfig, scn, scnIndex, lcrPosition, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext,
incrementalSnapshotContext);
sourceInfo.setCommitScn(commitScn);
}

public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, String lcrPosition,
public OracleOffsetContext(OracleConnectorConfig connectorConfig, Scn scn, Long scnIndex, String lcrPosition,
Scn snapshotScn, Map<String, Scn> snapshotPendingTransactions,
boolean snapshot, boolean snapshotCompleted, TransactionContext transactionContext,
IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
super(new SourceInfo(connectorConfig));
sourceInfo.setScn(scn);
sourceInfo.setScnIndex(scnIndex);
// It is safe to set this value to the supplied SCN, specifically for snapshots.
// During streaming this value will be updated by the current event handler.
sourceInfo.setEventScn(scn);
Expand Down Expand Up @@ -95,6 +97,7 @@ public static class Builder {

private OracleConnectorConfig connectorConfig;
private Scn scn;
private Long scnIndex;
private String lcrPosition;
private boolean snapshot;
private boolean snapshotCompleted;
Expand All @@ -113,6 +116,11 @@ public Builder scn(Scn scn) {
return this;
}

public Builder scnIndex(Long scnIndex) {
this.scnIndex = scnIndex;
return this;
}

public Builder lcrPosition(String lcrPosition) {
this.lcrPosition = lcrPosition;
return this;
Expand Down Expand Up @@ -149,7 +157,8 @@ public Builder snapshotScn(Scn scn) {
}

public OracleOffsetContext build() {
return new OracleOffsetContext(connectorConfig, scn, lcrPosition, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext,
return new OracleOffsetContext(connectorConfig, scn, scnIndex, lcrPosition, snapshotScn,
snapshotPendingTransactions, snapshot, snapshotCompleted, transactionContext,
incrementalSnapshotContext);
}
}
Expand Down Expand Up @@ -186,6 +195,9 @@ public static Builder create() {
else {
final Scn scn = sourceInfo.getScn();
offset.put(SourceInfo.SCN_KEY, scn != null ? scn.toString() : null);
if (sourceInfo.getScnIndex() != null) {
offset.put(SourceInfo.SCN_INDEX_KEY, sourceInfo.getScnIndex());
}
sourceInfo.getCommitScn().store(offset);
}
if (snapshotPendingTransactions != null && !snapshotPendingTransactions.isEmpty()) {
Expand All @@ -209,6 +221,10 @@ public void setScn(Scn scn) {
sourceInfo.setScn(scn);
}

public void setScnIndex(Long scnIndex) {
sourceInfo.setScnIndex(scnIndex);
}

public void setEventScn(Scn eventScn) {
sourceInfo.setEventScn(eventScn);
}
Expand All @@ -217,6 +233,10 @@ public Scn getScn() {
return sourceInfo.getScn();
}

public Long getScnIndex() {
return sourceInfo.getScnIndex();
}

public CommitScn getCommitScn() {
return sourceInfo.getCommitScn();
}
Expand Down Expand Up @@ -301,6 +321,9 @@ public String toString() {
sb.append(", snapshot=").append(sourceInfo.isSnapshot());
sb.append(", snapshot_completed=").append(snapshotCompleted);
}
else if (getScnIndex() != null) {
sb.append(", scnIndex=").append(getScnIndex());
}

sb.append(", commit_scn=").append(sourceInfo.getCommitScn().toLoggableFormat());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class SourceInfo extends BaseSourceInfo {
public static final String LCR_POSITION_KEY = "lcr_position";
public static final String SNAPSHOT_KEY = "snapshot";
public static final String USERNAME_KEY = "user_name";
public static final String SCN_INDEX_KEY = "scn_idx";

private Scn scn;
private CommitScn commitScn;
Expand All @@ -37,6 +38,7 @@ public class SourceInfo extends BaseSourceInfo {
private Integer redoThread;
private String rsId;
private long ssn;
private Long scnIndex;

protected SourceInfo(OracleConnectorConfig connectorConfig) {
super(connectorConfig);
Expand Down Expand Up @@ -147,6 +149,14 @@ public void setRedoThread(Integer redoThread) {
this.redoThread = redoThread;
}

public Long getScnIndex() {
return scnIndex;
}

public void setScnIndex(Long scnIndex) {
this.scnIndex = scnIndex;
}

@Override
protected Instant timestamp() {
return sourceTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public OracleOffsetContext load(Map<String, ?> offset) {
CommitScn commitScn = CommitScn.load(offset);
Map<String, Scn> snapshotPendingTransactions = OracleOffsetContext.loadSnapshotPendingTransactions(offset);
Scn snapshotScn = OracleOffsetContext.loadSnapshotScn(offset);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, snapshotScn, snapshotPendingTransactions, snapshot, snapshotCompleted,
return new OracleOffsetContext(connectorConfig, scn, null, commitScn, null, snapshotScn,
snapshotPendingTransactions, snapshot, snapshotCompleted,
TransactionContext.load(offset),
SignalBasedIncrementalSnapshotContext.load(offset));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshotCompleted = Boolean.TRUE.equals(offset.get(OracleOffsetContext.SNAPSHOT_COMPLETED_KEY));

Scn scn = OracleOffsetContext.getScnFromOffsetMapByKey(offset, SourceInfo.SCN_KEY);
Long scnIndex = (Long) offset.get(SourceInfo.SCN_INDEX_KEY);
CommitScn commitScn = CommitScn.valueOf((String) null);
return new OracleOffsetContext(connectorConfig, scn, commitScn, null, null, null, snapshot, snapshotCompleted,
return new OracleOffsetContext(connectorConfig, scn, scnIndex, commitScn, null, null, null,
snapshot, snapshotCompleted,
TransactionContext.load(offset),
SignalBasedIncrementalSnapshotContext.load(offset));
}
Expand Down