Skip to content

Commit

Permalink
LR: remove reliance on in-memory structure during Apply phase. (#3459)
Browse files Browse the repository at this point in the history
  • Loading branch information
shama358 committed Dec 21, 2022
1 parent b6f816c commit 003d656
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public LogReplicationState processEvent(LogReplicationEvent event) throws Illega
log.info("Snapshot Sync transfer completed. Wait for snapshot apply to complete.");
WaitSnapshotApplyState waitSnapshotApplyState = (WaitSnapshotApplyState)fsm.getStates().get(LogReplicationStateType.WAIT_SNAPSHOT_APPLY);
waitSnapshotApplyState.setTransitionEventId(event.getEventId());
waitSnapshotApplyState.setBaseSnapshotTimestamp(fsm.getBaseSnapshot());
waitSnapshotApplyState.setBaseSnapshotTimestamp(event.getMetadata().getLastTransferredBaseSnapshot());
fsm.setBaseSnapshot(event.getMetadata().getLastTransferredBaseSnapshot());
fsm.setAckedTimestamp(event.getMetadata().getLastLogEntrySyncedTimestamp());
return waitSnapshotApplyState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,19 +304,26 @@ private void applyShadowStream(UUID streamId, long snapshot) {
// as these streams will get trimmed and 'clear' will be a 'data loss'.
if (MERGE_ONLY_STREAMS.contains(streamId)) {
log.debug("Do not clear stream={} (merge stream)", streamId);
} else if (!replicatedStreamIds.contains(streamId)) {
log.trace("No data was written to stream {} on source or sink." +
" Do not clear.", streamId);
return;
} else {
smrEntries.add(CLEAR_ENTRY);
}

boolean shouldAddClearRecord = !MERGE_ONLY_STREAMS.contains(streamId);
while (iterator.hasNext()) {
// append a clear record at the beginning of every non-merge-only streams
if(shouldAddClearRecord) {
smrEntries.add(CLEAR_ENTRY);
shouldAddClearRecord = false;
}

OpaqueEntry opaqueEntry = iterator.next();
smrEntries.addAll(opaqueEntry.getEntries().get(shadowStreamId));
}

// if clear record has not been added by now,indicates that shadow stream is empty.
if (shouldAddClearRecord) {
log.trace("No data was written to stream {} on source or sink. Do not clear.", streamId);
return;
}

if (streamId.equals(REGISTRY_TABLE_ID)) {
smrEntries = filterRegistryTableEntries(smrEntries);
}
Expand Down

0 comments on commit 003d656

Please sign in to comment.