Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5363: KIP-167 implementing bulk load, restoration event notification #3325

Closed

Conversation

bbejeck
Copy link
Contributor

@bbejeck bbejeck commented Jun 13, 2017

No description provided.

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 13, 2017

ping @mjsax @guozhangwang @enothereska @dguy for initial review

This still need more test coverage, but I wanted to submit the PR to move the discussion forward.

EDIT: The batch restoration benchmark will undergo some refactoring as well.

EDIT round 2: Batch restoration benchmark will be in a follow in PR

@bbejeck bbejeck changed the title KAFKA-5363 [WIP] : Initial cut at implementing bulk load for persistent stat… KAFKA-5363 [WIP] : KIP-167 Initial cut at implementing bulk load, restoration event notification Jun 13, 2017
@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5236/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 13, 2017

Fixing checkstyle errors

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5254/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5239/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 13, 2017

addressing errors

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5258/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5242/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5298/
Test FAILed (JDK 7 and Scala 2.11).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 14, 2017

Error seems unrelated to this PR Execution failed for task ':core:compileScala'. Can't reproduce locally either.

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5282/
Test PASSed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 14, 2017

retest this please.

@enothereska
Copy link
Contributor

I think the restore callbacks etc look good. My main question will be around the bulk loading part, i.e., if we go with batch loading, will we still be able to look into KAFKA-4868 or are they mutually exclusive? Thanks @bbejeck .

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5285/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 14, 2017

@enothereska current plan is to have KAFKA-4868 be part of the bulk/batch loading in this PR

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5301/
Test FAILed (JDK 7 and Scala 2.11).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 14, 2017

retest this please.

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5317/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5301/
Test FAILed (JDK 8 and Scala 2.12).

@bbejeck bbejeck force-pushed the KAFKA-5363_add_ability_to_batch_restore branch from 9170d0a to 0f3b947 Compare June 15, 2017 20:42
@asfgit
Copy link

asfgit commented Jun 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5369/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5354/
Test PASSed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 20, 2017

@enothereska - updated code to handle KAFKA-4868

EDIT: Still owe tests on this PR

@asfgit
Copy link

asfgit commented Jun 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5476/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jun 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5491/
Test PASSed (JDK 7 and Scala 2.11).

@bbejeck bbejeck force-pushed the KAFKA-5363_add_ability_to_batch_restore branch from 7bc4575 to 41a21af Compare June 29, 2017 19:43
@asfgit
Copy link

asfgit commented Jun 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5811/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jun 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5797/
Test PASSed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jun 30, 2017

Still owe tests

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments...

* across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances.
*
* Users desiring stateful operations will need to provide synchronization internally in
* the StateRestorerListener implementation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit {@code StateRestorerListener}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}


private static final class NoOpStateRestoreListener implements StateRestoreListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this extend AbstractNotifyingRestoreCallback to save all the boiler plate from below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will still have one no-op method, but I guess it's worth it as it does reduce the boilerplate some.

* or {@link StateRestoreCallback} instead for single action restores.
*/
@Override
public void restore(byte[] key, byte[] value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a global "one parameter per line" code style? I think it would help to make diffs cleaner. We can do this incrementally. If yes, please do for all newly introduced code of this PR.

Also, should be add final all over the place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, can you add to the Steams guidelines?

/**
* Method called at the very beginning of {@link StateStore} restoration.
*
* @param topicPartition the TopicPartition containing the values to restore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parameter descriptions are no sentences, thus no . at the end (on many other places, too). If we say they are sentences, they it should start with upper case [T]he TopicPartition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}
}

private static final class NoOpStateRestoreCallback implements StateRestoreCallback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this class won't be used anymore, so removed.

@@ -135,6 +140,10 @@ public void openDB(ProcessorContext context) {
// (this could be a bug in the RocksDB code and their devs have been contacted).
options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));

if (prepareForBulkload) {
options.prepareForBulkLoad();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

@@ -254,6 +255,19 @@ public void testCannotStartTwice() throws Exception {
}
}

@Test(expected = IllegalStateException.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not expect here and use fail within try-catch

@@ -305,4 +327,24 @@ public ProcessorNode currentNode() {
public void close() {
metrics.close();
}

private static final class NoOpRestoreListener implements StateRestoreListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

@bbejeck
Copy link
Contributor Author

bbejeck commented Jul 26, 2017

rebasing now

@bbejeck bbejeck force-pushed the KAFKA-5363_add_ability_to_batch_restore branch from 00d98c0 to 9b60972 Compare July 26, 2017 21:22
@bbejeck
Copy link
Contributor Author

bbejeck commented Jul 26, 2017

@guozhangwang @mjsax
rebased and comments addressed

@asfgit
Copy link

asfgit commented Jul 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6369/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6372/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6354/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jul 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6357/
Test PASSed (JDK 8 and Scala 2.12).

@bbejeck
Copy link
Contributor Author

bbejeck commented Jul 27, 2017

ping @guozhangwang for final review and merge

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck Some more minor comments about code styles.

@@ -51,14 +57,30 @@ long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
}

void restore(final byte[] key, final byte[] value) {
stateRestoreCallback.restore(key, value);
void notifyRestoreStarted(long startingOffset, long endingOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename this function to restoreStarted to be consistent with other names. Such will help other code readers to understand these functions are for the same code granularity and semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

if (stateRestoreCallback instanceof StateRestoreListener) {
storeRestoreListener = (StateRestoreListener) stateRestoreCallback;
} else {
storeRestoreListener = new NoOpStateRestoreListener();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment missed somehow? I think line 42 above could be storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, must have overlooked


public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {

//Verifies store name called for each state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after // and we do not need capitalize the in-function comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -198,6 +220,11 @@ private RocksDB openDB(File dir, Options options, int ttl) throws IOException {
}
}

//Visible for testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for in-function and simple top function comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
private final StoreChangelogReader
changelogReader =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new lines are generally not recommended to break object type declaration with object name. For this specific line I think we can still make them in one line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, need to adjust Intellij settings

@@ -56,7 +71,8 @@ public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Excep
throw new TimeoutException("KABOOM!");
}
};
final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new MockTime(), 0);
final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 0, stateRestoreListener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: newline after keywords are generally not recommended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, same as above

private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value));
private final BatchingStateRestoreCallback wrappedBatchingStateRestoreCallback = new
WrappedBatchingStateRestoreCallback(mockRestoreCallback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for new line rules. Could you make a pass over all the newlines and see if they can be improved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

final StateRestoreCallback restoreCallback = restoreFuncs.get(storeName);
for (final KeyValue<byte[], byte[]> entry : changeLog) {
restoreCallback.restore(entry.key, entry.value);
final StateRestoreListener restoreListener = (restoreCallback instanceof StateRestoreListener) ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the WrappedBatchingStateRestoreCallback here?

Copy link
Contributor Author

@bbejeck bbejeck Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -305,4 +328,12 @@ public ProcessorNode currentNode() {
public void close() {
metrics.close();
}

private static final class NoOpRestoreListener extends AbstractNotifyingBatchingRestoreCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.kafka.streams.processor.internals.NoOpStateRestoreListener can be used here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@bbejeck
Copy link
Contributor Author

bbejeck commented Jul 27, 2017

@guozhangwang @mjsax updates per comments

@asfgit
Copy link

asfgit commented Jul 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6402/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6387/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

Merged to trunk. THanks @bbejeck !

@asfgit asfgit closed this in c50c941 Jul 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants