Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception is not propagated if there is an uncaught error in Sender #227

Closed
nemzsom opened this issue Sep 12, 2016 · 0 comments
Closed

Exception is not propagated if there is an uncaught error in Sender #227

nemzsom opened this issue Sep 12, 2016 · 0 comments
Labels
Milestone

Comments

@nemzsom
Copy link

nemzsom commented Sep 12, 2016

Hi!
I found an issue when tried to use the library (version 0.11) with a kafka 0.9.x server. I got this exception:

14:24:13.262 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread:
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topic_metadata': Error reading array of size 423023, only 34 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
    at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
    at java.lang.Thread.run(Thread.java:745)

I know this is caused by the incompatibility between the client (0.10) and the server (0.9) but the issue is that the stream was not terminated. It just hangs. I would expect an exception that a supervisor can handle.

When I run this code:

implicit val system = ActorSystem("TEST")
  implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system))

  val supervisor: Supervision.Decider = {
    case e =>
      println(s"Exception in supervisor: ${e.getMessage}")
      e.printStackTrace()
      Supervision.stop
  }

  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

  val done = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      new ProducerRecord[String, String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings).withAttributes(ActorAttributes.supervisionStrategy(supervisor)))

  done.onComplete {
    case _ => println("Stream done")
  }

neither the Exception in supervisor nor the Stream done line is printed.

If I switch to a kafka 0.10 server the code works, but the issue is if an error happens in the communication I would like to get an exception.
I attached a little sbt project to reproduce the issue.

Thank you for this great library!

reactiveKafkaIssue.zip

@nemzsom nemzsom changed the title Exception is not propagatied Exception is not propagated if there is an uncaught error in Sender Sep 12, 2016
kciesielski pushed a commit to kciesielski/reactive-kafka that referenced this issue Sep 16, 2016
- WakeupException is swallowed until imit is exceeded, then actor stops
- Any other exception stops the actor immediately
- Refs akka#227 akka#224
kciesielski pushed a commit to kciesielski/reactive-kafka that referenced this issue Sep 16, 2016
- WakeupException is swallowed until imit is exceeded, then actor stops
- Any other exception stops the actor immediately
- Refs akka#227 akka#224
patriknw pushed a commit that referenced this issue Sep 22, 2016
* Handle poll() exceptions in consumer actor
- WakeupException is swallowed until imit is exceeded, then actor stops
- Any other exception stops the actor immediately
- Refs #227 #224
@patriknw patriknw added this to the 0.12 milestone Sep 23, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants