-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask #2212
Conversation
…edOperationException
@guozhangwang @mjsax @enothereska - i found this today while i was trying to run the smoke test. If we have a standby task that is updating a store that is based on an internal change log (i.e, logging is enabled), then it will throw an |
@@ -143,6 +143,7 @@ public void before() throws IOException { | |||
streamsConfiguration | |||
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); | |||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, no, but...
One of the tests was timing out occasionally (30 second timeout) and this was happening because the default commit interval is 30 seconds. When caching is enabled there will be no flushing of state for 30 seconds, so the test either takes at least that long to complete successfully or it takes that long and fails.
So this means the test is much more reliable and completes within a few seconds.
@@ -21,13 +21,13 @@ | |||
import org.apache.kafka.common.serialization.Serde; | |||
import org.apache.kafka.streams.KeyValue; | |||
import org.apache.kafka.streams.StreamsMetrics; | |||
import org.apache.kafka.streams.processor.internals.RecordCollector; | |||
import org.apache.kafka.streams.processor.internals.RecordContext; | |||
import org.apache.kafka.streams.processor.StateRestoreCallback; | |||
import org.apache.kafka.streams.processor.StateStore; | |||
import org.apache.kafka.streams.processor.TaskId; | |||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's group all o.a.k.s.p.internals imports together, before the StateRestoreCallback
for alphabetical ordering.
LGTM, just two clarification comments. |
Thanks @guozhangwang, updated |
LGTM |
Unit test passed locally. |
Merged to trunk, cherry-picked to 0.10.1. |
* | ||
* @return the map from TopicPartition to offset | ||
*/ | ||
Map<TopicPartition, Long> offsets() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed from the interface? As RecordCollector
is internal, there will be no collector not dealing with offsets, would it?
One comment. Otherwise LGTM. |
…andbyTask Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`. Refactored `RecordCollector` to have an interface and impl. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Eno Thereska, Guozhang Wang Closes apache#2212 from dguy/standby-task
Instead of throwing
UnsupportedOperationException
fromStandbyTask.recordCollector()
return a No-op implementation ofRecordCollector
.Refactored
RecordCollector
to have an interface and impl.