Skip to content

Commit

Permalink
KAFKA-6782: solved the bug of restoration of aborted messages for Glo…
Browse files Browse the repository at this point in the history
…balStateStore and KGlobalTable (apache#4900)

Reviewer: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
  • Loading branch information
Gitomain authored and mjsax committed Jun 12, 2018
1 parent 7a59061 commit 40f63eb
Show file tree
Hide file tree
Showing 6 changed files with 429 additions and 66 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,5 +1,6 @@
dist
*classes
*.class
target/
build/
build_eclipse/
Expand Down
1 change: 1 addition & 0 deletions kafka
Submodule kafka added at cc43e7
Expand Up @@ -271,8 +271,8 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(), record.value()));
}
offset = globalConsumer.position(topicPartition);
}
offset = globalConsumer.position(topicPartition);
stateRestoreAdapter.restoreAll(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
Expand Down

0 comments on commit 40f63eb

Please sign in to comment.