Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown in KafkaActorPublisher #45

Closed
juanjovazquez opened this issue Oct 22, 2015 · 5 comments
Closed

Graceful shutdown in KafkaActorPublisher #45

juanjovazquez opened this issue Oct 22, 2015 · 5 comments

Comments

@juanjovazquez
Copy link

As it's mentioned in Handling errors, it's possible to have a publisher or consumer actor supervised by its parent. I'm doing that as follows:

val publisherActorProps = kafka.consumerActorProps(consumerProps)
 .withDispatcher(ReactiveKafka.ConsumerDefaultDispatcher)
val publisherActor = context.actorOf(publisherActorProps)
val kafkaPublisher: Publisher[KafkaMessage[Payload]] = ActorPublisher(publisherActor)
Source(kafkaPublisher) runForeach (context.self ! _.message())

Unfortunately, if the parent actor is stopped then its child actors are also stopped in advance so that the publisher actor is not shutdown gracefully and the kafka consumer is not closed either. So now, my only way to shutdown gracefully is sending a previous cancellation message as follows:

publisherActor ! ActorPublisherMessage.Cancel

But what happen if the parent actor is unexpectedly stopped?. I think the kafka consumer won't be properly stopped and remain working trying to read from the kafka topic.

As far as I understand, a way to solve this would be overriding postStop() in KafkaActorPublisher and sending the cancellation message there.

WDYT?.

Thanks in advance.

@kciesielski
Copy link
Contributor

That was my idea a while ago, but unfortunately postStop() also gets called on actor restart and the problem is that after a KafkaActorPublisher gets restarted, it is impossible to revive its inner consumer. However, I'm also quite bothered by the way that we need to handle shutdowns with current API (the way you mentioned or by DeathWatch). Let's keep this issue open and look for some solution.

@juanjovazquez
Copy link
Author

We'll keep working on this. By the way, I hadn't thought in dead watching for this and maybe could work better than cancelling in our case. Thanks for that!.

@algermissen
Copy link

Hi,

are there any new approaches towards gracefully shutting down the publisher?

Jan

@kciesielski
Copy link
Contributor

@algermissen 0.8.3 uses onErrorThenStop(ex), so in case of exception the publisher will be automatically closed. I'm working on the README to explain this, I guess this issue can be closed.

@juanjovazquez
Copy link
Author

Thanks @kciesielski, I'll check this out. Closing the ticket.

@ennru ennru added this to the invalid milestone Jun 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants