Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL consumer does not take all shards while consuming multiple streams #415

Closed
saeedansari opened this issue Sep 21, 2018 · 10 comments
Closed

Comments

@saeedansari
Copy link

My consumer application consumes two streams and it creates a Worker for each stream and each stream has more than one shard. In different executions, workers take different number of shards of each stream and not taking all shards. I am running only one instance of the application and I expect to consume all shards.

I even put a delay between starting workers to give takeLeases enough time to take shards, but still it is not working.

@sahilpalvia
Copy link
Contributor

The KCL (versions 1.x and 2.x) currently don't support running multiple instances of the Worker/Scheduler under the same application. Secondly are you using the same application name for the 2 streams? If you that would be an issue, KCL currently works per stream, all the information under a lease table belongs to only one stream and will fail if used with another stream.

@saeedansari
Copy link
Author

Thanks for your quick response. How about setting application name per stream like: appName+streamName? In this case should it work or still I would encounter other issues?

@sahilpalvia
Copy link
Contributor

The KCL would still have issues running multiple instances of the Worker/Scheduler under the same JVM. This is due to the current limitations of the ShardSyncer.

@saeedansari
Copy link
Author

Great! Thank you!

@sahilpalvia
Copy link
Contributor

Correction: You should be able to run multiple instances of the KCL under the same JVM. But due to the limitation of the ShardSyncer class, it can slow things down as it has static methods with locks. You still need to make sure that the application names are different.

There is PR open to fix this issue: #395. Although this fix would be available in the 2.x version of the KCL.

@saeedansari
Copy link
Author

ok, so for now to consume multiple stream, the rule of thumb to scale per shard is (# of instances = # of streams * # of shards). For example to consume two streams with two shards, we should run 4 instances of same application and application name should be set by stream name.

@sahilpalvia
Copy link
Contributor

I am not sure I follow you, you don't need an instance of the KCL per shard. KCL Workers/Schedulers can handle multiple shards. When more KCL Workers/Schedulers are added, the shards get distributed as evenly as possible.

The application name for a KCL Worker/Scheduler is kind of a grouping which indicates that the KCL Workers/Schedulers with the same application name perform a particular set of actions on all the shards of a stream.

@saeedansari
Copy link
Author

Right! I mean if we want to distribute shards evenly among Workers then we need to run a Worker per shard which is obvious and on top of that each instance of KCL can consume from one stream. I just repeated it so it is clear for others :)

@sahilpalvia
Copy link
Contributor

The KCL Workers/Schedulers distribute leases among the same application. So if you have an application running 2 Workers/Schedulers reading from 2 streams for 2 shards, you will need 2 instances of you application to run. That way the first application instance holds 1 lease from each stream and the other application instance is the same.

If you want all 4 shards to be a single instance, I would recommend having a single KCL Worker/Scheduler per application.

The DynamoDB implementation of the lease management provided by the KCL is per application per stream.

@sahilpalvia
Copy link
Contributor

Version 2.0.4 is now available. Closing this issue. Feel free to reopen the issue if problem persists.

Version 2.0.4 removes the contention between multiple instances of the Scheduler when running under a single JVM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants