Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cut off buffering from ConsumerStage #35

Closed
IgorFedchenko opened this issue Aug 12, 2019 · 2 comments
Closed

Cut off buffering from ConsumerStage #35

IgorFedchenko opened this issue Aug 12, 2019 · 2 comments

Comments

@IgorFedchenko
Copy link
Contributor

Currently, we have consumer constantly pooling Kafka and storing all messages in buffer:

private void PullQueue()
{
_consumer.Poll(_settings.PollTimeout);
if (!_isPaused && _buffer.Count > _settings.BufferSize)
{
Log.Debug($"Polling paused, buffer is full");
_consumer.Pause(_assignedPartitions);
_isPaused = true;
}
}

Now we need just consume messages from Kafka topics on-demand. This will also improve back-pressure support for our provider.

I am going to submit PR for this on this week.

@IgorFedchenko
Copy link
Contributor Author

One interesting thing is than in scala sources they use buffering, but for completely another reason.

The structure of their package is a little bit more complex, with more abstractions - because they have lots of stages implemented (and I guess we will need to re implement this in #36 ). So the interesting part is contained in this file:
https://github.com/akka/alpakka-kafka/blob/master/core/src/main/scala/akka/kafka/internal/BaseSingleSourceLogic.scala

What they do is:
In onPull method, they try to get message from buffer as we do. But if buffer is empty, they send request to special consumer actor to get one for them. That actor acts as a request aggregator from multiple stages, getting message request from them and, once consumed partitions, sending this messages back to requesters. And this is possible that while stage is waiting for a message from actor, onPull will be called multiple times and multiple consume requests will be sent to an actor. In this case, when messages will be finally received from kafka my aggregator actor, he will send all several messages back (multiple requests - multiple messages). And all of them will be stored in a buffer and pushed to sink one by one - and next messages will be requested again.

In our implementation, instead of consumer actor aggregating requests, currently we have a timer that requests messages from kafka and populates buffer - even if there is no demand for messages.

So, what I want to say, is that indeed we do not need buffering right now, as well as a timer. We will consume messages from Kafka directly on demand. But we may consider having requests queue populated in onPull method, and continue consuming messages from kafka (for example if at the moment partition is empty, but we have a request from sink and keep trying to consume next message). Also, should we implement that stuff with consumer actor as scala does?

@Aaronontheweb what do you think about it? I know, we need to keep as close to scala implementation as possible, but this is more complicated then just removing timers and buffers, so need your approvement.

So far, I am going to remove all buffering anyway.

@IgorFedchenko
Copy link
Contributor Author

I will close this issue, because it is actually fixed in #40 . And the further discussion of other stage internals will be continued in #36

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants