-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState #15920
Conversation
This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign. New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState.
streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java
Show resolved
Hide resolved
@@ -459,6 +468,38 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr | |||
} | |||
} | |||
|
|||
private ApplicationState getApplicationState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't use "get" in Streams getter names. I guess this isn't exactly a pure getter, but still. On that note, perhaps a better name would be buildApplicationState
? 🤔
Also: even though it's all internal, I've been on a crusade to get everyone to write javadocs for methods in the StreamsPartitionAssignor with at least a brief explanation of what it does.
It's just a super complicated class that does a lot and often mutates things in a way that isn't obvious, so every little bit of documentation helps
...ams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
Show resolved
Hide resolved
@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr | |||
|
|||
// compute the assignment of tasks to threads within each client and build the final group assignment | |||
|
|||
getApplicationState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we're just throwing away the return value for now but I'd still do this:
getApplicationState( | |
final ApplicationState applicationState = getApplicationState( |
Otherwise it kind of seems like this method is supposed to be mutating the input parameters (many of the StreamsPartitionAssignor methods work this way so it's good to distinguish when we're just building something vs operating on the passed in structures)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SpotBugs was complaining about unused variables when I had that, so I will simply leave the invocation of that new method out entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I see. Well there's no concern about forgetting to put this back in since we'll eventually need the result lol. All good 👍
public ApplicationStateImpl( | ||
final AssignmentConfigs assignmentConfigs, | ||
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates, | ||
final Set<TaskId> statefulTasks, | ||
final Set<TaskId> statelessTasks | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaStreams formatting for long signatures is (unfortunately) done like this :
public ApplicationStateImpl( | |
final AssignmentConfigs assignmentConfigs, | |
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates, | |
final Set<TaskId> statefulTasks, | |
final Set<TaskId> statelessTasks | |
) { | |
public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs, | |
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates, | |
final Set<TaskId> statefulTasks, | |
final Set<TaskId> statelessTasks) { |
It's annoying, I know. Can you make a pass over all the methods and make sure they follow the AK style?
final Set<TaskId> union = new HashSet<>(statefulTasks); | ||
union.addAll(statelessTasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could in theory be called multiple times, so we probably want to cache the result instead of building up a new map each time. We can still do it lazily, but I'd say just build the map in the constructor so we can make everything final (and unmodifiable)
|
||
@Override | ||
public long lagFor(final TaskId task) { | ||
final Long totalLag = taskLagTotals.get(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also throw if the KafkaStreamsState was built without requesting the task lags be computed, right? (IIRC we decided on UnsupportedOperationException in the last PR)
Same for the other lag-related methods here. I guess the safest thing to do is wrap the taskLagTotals
field in an Optional and make it empty when the user passed in computeTaskLags=false
to the ApplicationState#kafkaStreamsStates
API?
public long lagFor(final TaskId task) { | ||
final Long totalLag = taskLagTotals.get(task); | ||
if (totalLag == null) { | ||
throw new IllegalStateException("Tried to lookup lag for unknown task " + task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a general rule that we clearly don't always follow in Streams, but should/are actively getting better about, we should make sure to always log an error before throwing a (new) exception. Sometimes the error handling can make it difficult to trace back an exception to the original source of error, and error logs help with that. Doesn't need to be too complicated -- although in this case, perhaps it would make sense to include the keySet of taskLagTotals
in the error log (I'm thinking to help differentiate the case where this specific task isn't included vs the entire map being empty)
|
||
@Override | ||
public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean computeTaskLags) { | ||
return kafkaStreamsStates; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure to respect the computeTaskLags
param here. There's two things we can do: wait until this method and then call some KafkaStreamsStateImpl#computeTaskLags
method, or just wait to construct the KafkaStreamsImpls at all until we know whether or not we should compute the task lags, and then pass the computeTaskLags
flag into the KafkaStreamsImpl constructor.
I personally prefer the latter since that way there's no partially-initialized classes floating around and we don't have to keep track of when the task lags are computed/initialized.
return new TreeSet<>(previousStandbyTasks.taskIds()); | ||
} | ||
|
||
public SortedMap<String, Set<TaskId>> taskIdsByConsumer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: call this taskIdsByPreviousConsumer
or something to that effect (ie include the word "previous")
It's so hard to keep track of what pertains to the previous assignment vs the new assignment in this mess of a class lol. That's one of the biggest improvements in KIP-924
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, ping me when the build finishes running and I'll merge
The test env seems pretty unstable right now but we have at least one clean build for each java version if you look at the two latest runs. All test failures are unrelated as well. Seems safe to merge |
Merged to trunk |
…amsState (apache#15920) This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign. New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState. One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
…amsState (apache#15920) This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign. New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState. One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within
StreamsPartitionAssignor#assign.
New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState.
One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class.
Committer Checklist (excluded from commit message)