-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Event Hubs] Partition load balancer implementation in Event Processor #4839
[Event Hubs] Partition load balancer implementation in Event Processor #4839
Conversation
…to implement_loadbalancer
@@ -203,6 +206,26 @@ export class EventProcessor { | |||
this._partitionManager = partitionManager; | |||
this._processorOptions = options; | |||
this._pumpManager = new PumpManager(this._id, options); | |||
const inactiveTimeLimitInMS = 60000; // ownership expiration time (1 mintue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is this the same for all SDKs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the SDKs are using different time. Java is using 5 minutes, python is using 20 seconds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you start a discussion with the other language SDKs to pin down a value to use for this? As long as it's longer than the loop which runs every 10 seconds I'm not too concerned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everyone is using longer time than the loop which runs every 10 seconds. I'll start discussion with other language SDKs to make this consistent across all the languages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question, did we reach a consensus on whether to include partitions we already own when claiming them? I recall discussions around this to prevent the case where no checkpointing has occurred for a long time due to a lack of events being received.
If so this would require more changes, but I'm also fine with that being in a separate PR since I don't think it would require any major refactoring with what you have here.
@@ -203,6 +206,26 @@ export class EventProcessor { | |||
this._partitionManager = partitionManager; | |||
this._processorOptions = options; | |||
this._pumpManager = new PumpManager(this._id, options); | |||
const inactiveTimeLimitInMS = 60000; // ownership expiration time (1 mintue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you start a discussion with the other language SDKs to pin down a value to use for this? As long as it's longer than the loop which runs every 10 seconds I'm not too concerned.
} | ||
return ( | ||
numberOfPartitionsOwned < minPartitionsPerEventProcessor || | ||
numberOfPartitionsOwned === leastPartitionsOwnedByAnyEventProcessor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this second condition makes sense.
Scenario:
There are 6 partitions total, and 4 EventProcessors.
The expected distribution would be 2 EventProcessors owning 2 partitions, and 2 EventProcessors owning 1 partition.
Let's say our EventProcessor currently owns 1 partition. In this case, numberOfPartitionsOwned === leastPartitionsOwnedByAnyEventProcessor
will be true, indicating that the EventProcessor should own more partitions. This would be incorrect though since based on the scenario, we know we've reached an optimal load balance.
I think the EventProcessor should own more partitions if:
- numberOfPartitionsOwned is less than the minimum partitions per event processor.
- The sum of partitions actively owned by any processor is less than the total number of partitions AND the numberOfPartitionsOwned is less than the
minPartitionsPerEventProcessor + 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't reach this code if the load is already balanced. On line 185, there's a check to see if the load is already balanced. This method gets called only when the load is not balanced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, but I still think it was right to change the logic since the function should still act as expected in isolation. (If we changed the code such that we didn't check _isLoadBalanced first this method should still return the expected result)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what you said is true only if _shouldOwnMorePartitions()
was public, but I don't think that if the method is private then it has to work in isolation. If anyone will change the code, they should check the logic before using private methods in a class.
|
||
if (!this._shouldOwnMorePartitions(minPartitionsPerEventProcessor, ownerPartitionMap)) { | ||
log.partitionLoadBalancer( | ||
`[${this._ownerId}] This event processor owns ${ownerPartitionMap.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to print the number of partitions owned, or the array? Right now I think it will do the array and not the length.
I think we didn't reach to any conclusion that day. I'll check with other languages SDKs and create a separate PR for that. |
/azp run js - eventhubs-client - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only other comment would be to add tests around load balancing for the cases mentioned in the PR, otherwise LGTM.
Implement partition load balancer that would (almost) evenly distribute ownership of Event Hub partitions among all the active Event Processors registered for a given Event Hub + consumer group combination. Each event processor is responsible for processing events from the partition it owns.
This PR is for load balancing Event Processors as per the algorithm - https://gist.github.com/srnagar/7ef8566cfef0673288275c450dc99590