Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to shut down a stream cleanly #20

Closed
xelax opened this issue Jul 17, 2015 · 4 comments
Closed

how to shut down a stream cleanly #20

xelax opened this issue Jul 17, 2015 · 4 comments
Milestone

Comments

@xelax
Copy link

xelax commented Jul 17, 2015

I would like to see an example of how to shut down kafka source cleanly and how to detect errors (for example if the kafka server goes down or if the connection fails)

@kciesielski
Copy link
Contributor

I'm working on this now as a part of #21.
As I mentioned there:
Underneath the subscriber is an Akka actor. Errors can be handled with custom behavior using DeathWatch. I'm checking how one can pass more details on exceptions with that.

@kciesielski
Copy link
Contributor

@xelax Error handling can be realized using the API like in following example:
https://github.com/softwaremill/reactive-kafka/blob/master/src/test/scala/examples/examples.scala#L40
You can shut down the source by sending a ActorSubscriberMessage.OnComplete to a KafkaActorSubscriber actor.

@kciesielski
Copy link
Contributor

@xelax you can also use more idiomatic way to handle errors. I'm hoping to add some documentation soon. Here's a quick example:

      val decider: Supervision.Decider = {
        case _ => Supervision.Resume
      }

      Source(kafkaPublisher)
        .withAttributes(ActorAttributes.supervisionStrategy(decider))
        .runWith(someSink)

@kciesielski
Copy link
Contributor

@xelax looks like I gave a wrong example. A source will not restart on errors, it will just propagate the error further by calling onError() on its subscribers. Here's a better example of custom error, handling, which is meaningful for a Sink:

val decider: Supervision.Decider = {
  case _ => Supervision.Resume // Your custom handling
}
Source(publisher)
  .map(_.message().toUpperCase)
  .to(Sink(kafkaSubscribe).withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .run()

@kciesielski kciesielski added this to the 0.8.0 milestone Aug 26, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants