KAFKA-20106 [2/2]: Ensure reconciled assignment updated within poll KS#21813
KAFKA-20106 [2/2]: Ensure reconciled assignment updated within poll KS#21813lianetm wants to merge 7 commits intoapache:trunkfrom
Conversation
| event.assignedPartitions(), event.addedPartitions()); | ||
| } else if (requestManagers.streamsMembershipManager.isPresent()) { | ||
| requestManagers.streamsMembershipManager.get().applyAssignment( | ||
| (SortedSet<TopicPartition>) event.assignedPartitions(), event.addedPartitions()); |
There was a problem hiding this comment.
ApplyAssignmentEvent wraps assignedPartitions with Collections.unmodifiableSet() in its constructor, which does not preserve the SortedSet interface (unlike Collections.unmodifiableSortedSet()). So the cast (SortedSet<TopicPartition>) event.assignedPartitions() will throw ClassCastException at runtime.
The unit test masks this because completeAssignment() calls membershipManager.applyAssignment() directly, bypassing the ApplyAssignmentEvent round-trip.
There was a problem hiding this comment.
good call (the gap didn't exits in this PR though, but it is indeed clashing now with the recent fix wrapping into unmodifiable). Will fix it here to align. Thanks!
There was a problem hiding this comment.
fixed by removing the casting and updating the applyAssignment params (we don't really need a SortedSet at that point)
| PARTITIONS_ASSIGNED, | ||
| PARTITIONS_REMOVED, | ||
| STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED, | ||
| STREAMS_PARTITIONS_ASSIGNED, |
There was a problem hiding this comment.
Nit: the class is StreamsTasksAssignedEvent but the enum is STREAMS_PARTITIONS_ASSIGNED. Consider STREAMS_TASKS_ASSIGNED to keep them aligned.
| */ | ||
| public void applyAssignment(SortedSet<TopicPartition> assignedPartitions, SortedSet<TopicPartition> addedPartitions) { | ||
| subscriptionState.assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions); | ||
| notifyAssignmentChange(assignedPartitions); |
There was a problem hiding this comment.
Could this be package-private? It's only called from ApplicationEventProcessor and tests.
There was a problem hiding this comment.
we can't. AppEventProcessor is in a diff package than the managers.
| assignment | ||
| ); | ||
| backgroundEventHandler.add(event); | ||
| log.debug("Enqueued StreamsPartitionsAssignedEvent to apply assignment and trigger onTasksAssigned callback"); |
There was a problem hiding this comment.
Nit: log says StreamsPartitionsAssignedEvent but the class is StreamsTasksAssignedEvent.
Apply same fix from #21495 to the
Streams manager.
This ensures that assignment updates happen within a call to
consumer.poll (even when reconciliations may complete async in the
background). Done by piggybacking the assignment update the in the
existing hop that was made to the app thread when a reconciliation
completed (previously used to trigger callback only, now used to update
assignment and run callback)