Transactional State [2/5]: Added a ChangelogSSPIterator API#1161
Transactional State [2/5]: Added a ChangelogSSPIterator API#1161prateekm merged 1 commit intoapache:masterfrom
Conversation
bd53e0f to
f59bc19
Compare
bharathkumarasubramanian
left a comment
There was a problem hiding this comment.
Mostly minor comments/questions.
It should be fine to have them addressed as part of the 5/5 PR to make rebase easier.
| val mode = iterator.getMode | ||
|
|
||
| if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) { | ||
| batch.add(new Entry(keyBytes, valBytes)) |
There was a problem hiding this comment.
Should we throw an exception here if the iterator transitioned to RESTORE after being in TRIM or is it something we can guarantee as part of the ChangelogIterator?
There was a problem hiding this comment.
Will fix in PR 5/5. SAMZA-2337 to track.
| if (!lastBatchFlushed) { | ||
| info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".") | ||
| if (batch.size > 0) { | ||
| doPutAll(rawStore, batch) |
There was a problem hiding this comment.
do we need to clear the batch here just to be safe?
There was a problem hiding this comment.
Will update in PR 5. SAMZA-2337 to track.
| if (batch.size > 0) { | ||
| doPutAll(rawStore, batch) | ||
| } | ||
| // flush the store and the changelog producer |
There was a problem hiding this comment.
This isn't required for old code path. However, should this be a blocker instead of high for new code path?
There was a problem hiding this comment.
We do a task commit after the restore is complete and before we start processing. That will flush all producers (incl. changelog producer). So not a blocker.
| @@ -142,12 +142,22 @@ class TestKeyValueStorageEngine { | |||
| @Test | |||
| def testRestoreMetrics(): Unit = { | |||
There was a problem hiding this comment.
maybe add a test case to ensure the transition from TRIM to RESTORE doesn't happen.
mynameborat
left a comment
There was a problem hiding this comment.
Left comments w/ my personal account by mistake.
Approving w/ my apache account.
df7b992 to
0e52bb1
Compare
This PR changes the KeyValueStorageEngine restore API and implementation to take a new ChangelogSSPIterator instead of a plain Iterator.
The new ChangelogSSPIterator is similar to the existing SystemStreamPartitionIterator with the following differences:
RestoreandTrim.endingOffsetduring construction. The mode changes fromRestoretoTrimduring iteration if the current message offset is greater than the ending offset (and trim mode is enabled).For supporting transactional state, we only restore changelog messages up to the changelog SSP offset in the checkpoint topic. Any messages after this 'checkpointed changelog offset' are trimmed by overwriting them with the current store value. When used in conjunction with an appropriate 'min.compaction.lag.ms' configuration for the Kafka changelog topic, this ensures that on container restart any store contents are consistent with the last input checkpoints and do not reflect any newer changes.