New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2811: add standby tasks #526
Conversation
@@ -54,6 +54,10 @@ | |||
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; | |||
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; | |||
|
|||
/** <code>num.stream.threads</code> */ | |||
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; | |||
private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"... for each stream task"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
String storeName = entry.getKey(); | ||
TopicPartition storePartition = new TopicPartition(storeName, partition); | ||
|
||
long offset = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset is not assigned after initialized to -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. I will fix it.
changed changeLogOffsets() -> checkpointedOffsets() which returns all change log partitions of persistent store and their checkpointed offsets. If a checkpoint offset is missing in the checkpoint file, -1L is used. The restored offsets are no longer consulted. |
} | ||
} | ||
|
||
/** | ||
* Commit the state of a task | ||
* Commit the state of an task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"an task?"
LGTM, thanks. |
guozhangwang * added a new config param "num.standby.replicas" (the default value is 0). * added a new abstract class AbstractTask * added StandbyTask as a subclass of AbstractTask * modified StreamTask to a subclass of AbstractTask * StreamThread * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned() * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked() * In addStandbyTasks(), change log partitions are assigned to restoreConsumer. * In removeStandByTasks(), change log partitions are removed from restoreConsumer. * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0. * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks. Author: Yasuhiro Matsuda <yasuhiro@confluent.io> Reviewers: Guozhang Wang Closes #526 from ymatsuda/standby_task
@guozhangwang