Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer doesn't shutdown when using Stage that terminates stream #166

Closed
l15k4 opened this issue Jun 30, 2016 · 24 comments
Closed

Consumer doesn't shutdown when using Stage that terminates stream #166

l15k4 opened this issue Jun 30, 2016 · 24 comments

Comments

@l15k4
Copy link

l15k4 commented Jun 30, 2016

Hey, I guess there is a bug in current Master branch related to stream termination. This stream :

      Consumer.committableSource(consumerSettings("client1"), Subscriptions.topics("topic1"))
        .mapAsync(1)( msg => msg.committableOffset.commitScaladsl().map(_ => msg.value) )
        .take(5)
        .runWith(Sink.seq)

yields akka.kafka.KafkaConsumerActor$Internal$Stop$ dead letter which imho leads to hanging KafkaConsumerActor which would be otherwise stopped ... As a result, if you want to compose streams like this :

    val sf =
      Consumer.committableSource(consumerSettings("client1"), Subscriptions.topics("topic1"))
        .mapAsync(1)( msg => msg.committableOffset.commitScaladsl().map(_ => msg.value) )
        .take(5)
        .runWith(Sink.seq)
        .flatMap { result1 =>
          Thread.sleep(1000)
          Consumer.committableSource(consumerSettings("client1"), Subscriptions.topics("topic1"))
            .mapAsync(1)( msg => msg.committableOffset.commitScaladsl().map(_ => msg.value) )
            .take(5)
            .runWith(Sink.seq).map(result2 => result1 -> result2)
        }

It gets stuck indefinitely in the second stream for some reason ...

@13h3r 13h3r added the bug label Jun 30, 2016
@patriknw
Copy link
Member

patriknw commented Jul 1, 2016

thanks for reporting, @l15k4

@l15k4
Copy link
Author

l15k4 commented Jul 1, 2016

Then there is a question in the air whether to explicitly stop & shutdown consumer stream when actor system terminates. My guess is that it shuts down implicitly on actor system termination but I've noticed of hanging kafka client-server connections if I didn't do this in my actors :

  override def postStop() = {
    consumerControlOpt.foreach( control => control.stop.flatMap(_ => control.shutdown())(SameThreadExeC) )
  }

Wdyt?

@13h3r
Copy link
Member

13h3r commented Jul 2, 2016

@l15k4 you can call shutdown directly. stop is just stops emitting messages and if you stream is completed (no transaction in progress) consumer will be closed automatically.

@sasajib
Copy link

sasajib commented Jul 17, 2016

Hello, is there any update ? I am also facing this problem.

@13h3r
Copy link
Member

13h3r commented Jul 18, 2016

@sasajib I am not sure what is expected behavior in this scenario.

Current behavior is: we cancel Consumer stage and close the consumer. This behavior is consistent.

The other one is to keep Consumer alive. But how long? Until all messages committed? But what if we filter some? Any ideas?

@drewhk
Copy link
Member

drewhk commented Jul 18, 2016

Last time I recommended to add a configurable timeout and then the consumer stops:

  • if all messages have been committed
  • or if the timeout fires before the above happens

The timeout = 0 setting would reproduce the current behavior.

@13h3r
Copy link
Member

13h3r commented Jul 18, 2016

@drewhk sounds like a plan

@cchantep
Copy link

cchantep commented Mar 8, 2017

Seems the issue is still there

@asarkar
Copy link

asarkar commented Mar 15, 2017

Confirmed that this issue exists with v0.14. stop-timeout and close-timeout are set to 5s each, poll-interval = 1s, no messages are pending. Processing stream:

val ctl = akka.kafka.scaladsl.Consumer.committableSource(consumerSettings, Subscriptions.topics("ufo"))
    .log(s"${getClass.getName} - Consuming")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
    .map { msg =>
      updateAnalytics(msg.record.value)
      msg.committableOffset
    }
    .batch(max = batchSize.toLong, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
      batch.updated(elem)
    }
    .mapAsync(parallelism)(offsets => {
      offsets.commitScaladsl()
    })
    .toMat(Sink.ignore)(Keep.left)
    .run

In another class:

Await.result(ctl.get.shutdown.flatMap(_ => system.terminate),
    1.second)

Produces dead letter:

2017-03-14 23:43:51.127 [ufo-sightings-akka.actor.default-dispatcher-21] [INFO ] a.a.RepointableActorRef - 
Message [akka.kafka.KafkaConsumerActor$Internal$Stop$] from Actor[akka://ufo-sightings/deadLetters] 
to Actor[akka://ufo-sightings/system/kafka-consumer-1#1896610594] was not delivered. [1] dead letters encountered. 

@jaksky
Copy link

jaksky commented Mar 23, 2017

Hello I suppose it is related to this issue - non terminating Consumer stream. How do we handle testing e.g. producer -> topic -> consumer

val source = Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(testingTopic))
        .map(_.value())
      val value = source.map { r =>
        println("Recieved message " + r)
        r
      } .take(1)
        .runWith(Sink.seq)
      value.futureValue mustBe Seq(testingMSG)```

Resulting in timing out the future due to not terminating consumer source. Is there any workaround? Or how do we test as I suppose that this is common scenario 
Currently using v.14

@TeraWilliam
Copy link

Experiencing the dead letter problem too here.

@pkoryzna
Copy link

I am experiencing a related problem I believe: when a graph stage (such as mapAsync) inside a RunnableGraph fails with exception for which the decision should be Stop, it seems like the failure is not propagated somehow and consumer keeps running instead of shutting down, which keeps triggering.

@ksilin
Copy link

ksilin commented Aug 9, 2017

Experiencing the issue with Consumer.committableSource(...).take(n) as described above. Source is not terminating after n msgs. After being killed by timeout, akka.kafka.KafkaConsumerActor$Internal$Commit & akka.kafka.KafkaConsumerActor$Internal$Stop$ messages fail to be delivered.

Consumer.plainSource(...).take(n) seems not to have this issue and terminates cleanly after n msgs.

akka-stream-kafka 0.16

@rabzu
Copy link

rabzu commented May 17, 2018

Im facing this problem too. Are there any workarounds? or Some sort of a dirty way of killing the consumer. I really need to kill it.

@ennru
Copy link
Member

ennru commented Jun 12, 2018

@rabzu Would you have the chance to see if the problem is still there in 0.21?

@2m
Copy link
Member

2m commented Jun 12, 2018

@laymain
Copy link

laymain commented Aug 22, 2018

Same issue here in 0.22 using committableSource

@laymain
Copy link

laymain commented Aug 23, 2018

This is a critical issue, we're not able to use the Supervision.stop directive, application will not terminate.

public class Main {
  private static final String BOOTSTRAP_SERVERS = "kafka-server:9092";
  private static final String IN_TOPIC = "in-topic";
  private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

  public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("test");
    final ActorMaterializer materializer = ActorMaterializer.create(
      ActorMaterializerSettings.create(system).withSupervisionStrategy((Function<Throwable, Supervision.Directive>)(throwable -> {
        LOGGER.error("Unexpected error in stream", throwable);
        return Supervision.stop();
      })),
      system
    );

    final ConsumerSettings<byte[], byte[]> consumerSettings = ConsumerSettings
      .create(consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
      .withBootstrapServers(BOOTSTRAP_SERVERS)
      .withGroupId("test-" + UUID.randomUUID())
      .withClientId("test-" + UUID.randomUUID())
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    KillSwitch killSwitch = Consumer.committableSource(consumerSettings, Subscriptions.topics(IN_TOPIC))
      .viaMat(KillSwitches.single(), Keep.right())
      .mapAsync(30, msg -> {
        // Simulate error
        CompletableFuture<ConsumerMessage.CommittableMessage<byte[], byte[]>> future = new CompletableFuture<>();
        future.completeExceptionally(new Exception("Error in mapAsync"));
        return future;
      })
      .toMat(Sink.ignore(), Keep.left())
      .run(materializer);

    Runtime.getRuntime().addShutdownHook(new Thread(killSwitch::shutdown));

    LOGGER.info("Await stream termination...");
    try {
      system.getWhenTerminated().toCompletableFuture().join();
      // !! System never gets terminated
    } catch (Exception ex) {
      LOGGER.error("Unexpected error in stream", ex);
    }
    
    LOGGER.info("Shutting down stream...");
  }
}
[ERROR] [18/08/23 10:23:05] [Main] Unexpected error in stream: java.lang.Exception: Error in mapAsync
[DEBUG] [18/08/23 10:23:05] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] Heartbeat thread has closed
[DEBUG] [18/08/23 10:23:05] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] Sending LeaveGroup request to coordinator x.x.x.x:9092 (id: 2147483643 rack: null)
[DEBUG] [18/08/23 10:23:05] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] LeaveGroup request returned successfully
[DEBUG] [18/08/23 10:23:05] [consumer.KafkaConsumer] [Consumer clientId=..., groupId=...] Kafka consumer has been closed
[INFO] [08/23/2018 10:23:05.307] [test-akka.actor.default-dispatcher-2] [akka://test/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Stop$] without sender to Actor[akka://test/system/kafka-consumer-1#-1029846368] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test/system/kafka-consumer-1#-1029846368]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

The process never terminates, same issue if I call killSwitch.shutdown().


The only ugly workaround I've found to handle Supervision.stop directive and the use of the KillSwitch is to halt abruptly the application by:

  • calling System.exit() within the SupervisionStrategy which will trigger the shutdown hooks
  • calling System.halt() within the shutdown hook to force threads termination (and avoid deadlocks by not calling System.exit() again)
  private static final AtomicInteger EXIT_STATUS = new AtomicInteger(0);

  ...

  final ActorMaterializer materializer = ActorMaterializer.create(
    ActorMaterializerSettings.create(system).withSupervisionStrategy((Function<Throwable, Supervision.Directive>)(throwable -> {
      LOGGER.error("Unexpected error in stream", throwable);
      EXIT_STATUS.set(1);
      System.exit(EXIT_STATUS.get());
      return Supervision.stop();
    })),
    system
  );


  ...

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    LOGGER.info("Shutting down stream...");
    killSwitch.shutdown();
    Runtime.getRuntime().halt(EXIT_STATUS.get());
  }));
[ERROR] [18/08/23 10:42:55] [Main] Unexpected error in stream: java.lang.Exception: Error in mapAsync
[INFO] [18/08/23 10:42:55] [Main] Shutting down stream...

Process finished with exit code 1

@ennru
Copy link
Member

ennru commented Aug 24, 2018

The stream shuts down properly, including the KafkaConsumerActor as your logging shows (Kafka consumer has been closed).
To shut down the whole application you need to terminate the Actor system.
If you want to shut down the Actor system on stream failure, I recommend keeping the CompletionStage from the Sink.ignore and adding an exceptionally handler to it.

@laymain
Copy link

laymain commented Aug 24, 2018

Ok, I had misunderstood what was doing the KillSwitch: it ends the stream but not the system.
As adviced, I have kept the CompletationStage and terminated the Actor system once it is completed (exceptionally or not).

Pair<UniqueKillSwitch, CompletionStage<Done>> pair = Consumer.committableSource(consumerSettings, Subscriptions.topics(IN_TOPIC))
  .viaMat(KillSwitches.single(), Keep.right())
  .mapAsync(30, msg -> {
    // Simulate error
    CompletableFuture<ConsumerMessage.CommittableMessage<byte[], byte[]>> future = new CompletableFuture<>();
    future.completeExceptionally(new Exception("Error in mapAsync"));
    return future;
  })
  .toMat(Sink.ignore(), Keep.both())
  .run(materializer);
KillSwitch killSwitch = pair.first();
CompletionStage<Done> stage = pair.second();

Runtime.getRuntime().addShutdownHook(new Thread(killSwitch::shutdown));

LOGGER.info("Await stream termination...");
stage.whenComplete((done, throwable) -> {
  if (throwable != null) {
    LOGGER.error("Stream completed with an error", throwable);
  }
  LOGGER.info("Shutting down stream...");
  system.terminate();
});
[ERROR] [18/08/24 14:18:48] [Main] Unexpected error in stream: java.lang.Exception: Error in mapAsync
[ERROR] [18/08/24 14:18:48] [Main] Stream completed with an error: java.lang.Exception: Error in mapAsync
 [INFO] [18/08/24 14:18:48] [Main] Shutting down actor system...
[DEBUG] [18/08/24 14:18:48] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] Heartbeat thread has closed
[DEBUG] [18/08/24 14:18:48] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] Sending LeaveGroup request to coordinator x.x.x.x:9092 (id: 2147483643 rack: null)
[DEBUG] [18/08/24 14:18:48] [internals.AbstractCoordinator] [Consumer clientId=..., groupId=...] LeaveGroup request returned successfully
[DEBUG] [18/08/24 14:18:48] [consumer.KafkaConsumer] [Consumer clientId=..., groupId=...] Kafka consumer has been closed

Process finished with exit code 0

Application now terminates smoothly as expected, thank you for the help.

@ennru
Copy link
Member

ennru commented Sep 7, 2018

There hasn't been a documented case of the Actor not shutting down since v0.21.
I've reported #526 which shows as Stop messages as dead letters.
Please report details in case you suspect the consumer stays alive while the stage terminated.

@robertotena
Copy link

Hi, I think I am having the same issue with v1.0

I replicated a small piece of code that reproduces the issue:

object TestStream extends App with LazyLogging {

  implicit val system = ActorSystem("test-actor-system")
  implicit val materializer = ActorMaterializer()

  logger.info(s"System starting up...")

  val sinkActor = system.actorOf(Props(new SinkActor()), "sink-actor")

  val runningStream =
    Consumer.plainSource(
      ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer),
      Subscriptions.topics("test-topic")
    )
      .map(_.value)
      .to(Sink.actorRefWithAck(sinkActor, Init, Ack, Done))
      .run

  sys.addShutdownHook {
    logger.info("System shutting down...")
    Await.ready(runningStream.shutdown, 10 seconds)
    Await.ready({
      materializer.shutdown
      system.terminate
    }, 10 seconds)
  }

  logger.info("System initialized.")

}

case object Ack
case object Init

class SinkActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info(s"SinkActor started.")

  override def postStop(): Unit = log.info(s"SinkActor stopped.")

  override def receive: Receive = {

    case Init =>
      log.info(s"Init message received")
      sender ! Ack

    case Done =>
      log.info(s"Done message received")

    case s: String =>
      log.info(s"Stream message received ${s}")
      sender ! Ack

    case default =>
      log.info(s"Unknown message received ${default}")

  }

}

I am just sending 5 string messages to that Kafka topic, and when I stop the application, I get 2 different results:

  1. If the await time of the stream (10 seconds) is lower than the stop-timeout, it throws an exception:
2019-04-03 18:41:36.048 [main] INFO  com.test.TestStream$ - System starting up...
[INFO] [04/03/2019 18:41:36.056] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] SinkActor started.
2019-04-03 18:41:36.185 [main] INFO  com.test.TestStream$ - System initialized.
[INFO] [04/03/2019 18:41:36.195] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Init message received
[INFO] [04/03/2019 18:41:40.315] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received hello
[INFO] [04/03/2019 18:41:40.315] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received this
[INFO] [04/03/2019 18:41:40.315] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received is
[INFO] [04/03/2019 18:41:40.315] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received a
[INFO] [04/03/2019 18:41:40.315] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received test
2019-04-03 18:41:45.083 [shutdownHook1] INFO  com.test.TestStream$ - System shutting down...
[INFO] [04/03/2019 18:41:45.084] [test-actor-system-akka.actor.default-dispatcher-3] [akka://test-actor-system/user/sink-actor] Done message received
Exception in thread "shutdownHook1" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:255)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:183)
	at scala.concurrent.Await$.$anonfun$ready$1(package.scala:191)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.ready(package.scala:142)
	at com.test.TestStream$.$anonfun$new$1(TestStream.scala:35)  --> the await of runningStream
	at scala.sys.ShutdownHookThread$$anon$1.run(ShutdownHookThread.scala:33)
  1. Otherwise it stops with a dead letter:
2019-04-03 18:40:39.160 [main] INFO  com.test.TestStream$ - System starting up...
[INFO] [04/03/2019 18:40:39.166] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] SinkActor started.
2019-04-03 18:40:39.318 [main] INFO  com.test.TestStream$ - System initialized.
[INFO] [04/03/2019 18:40:39.329] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Init message received
[INFO] [04/03/2019 18:40:43.425] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received hello
[INFO] [04/03/2019 18:40:43.425] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Stream message received this
[INFO] [04/03/2019 18:40:43.425] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] Stream message received is
[INFO] [04/03/2019 18:40:43.426] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] Stream message received a
[INFO] [04/03/2019 18:40:43.426] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] Stream message received test
2019-04-03 18:40:46.201 [shutdownHook1] INFO  com.test.TestStream$ - System shutting down...
[INFO] [04/03/2019 18:40:46.205] [test-actor-system-akka.actor.default-dispatcher-2] [akka://test-actor-system/user/sink-actor] Done message received
[INFO] [04/03/2019 18:40:51.246] [test-actor-system-akka.actor.default-dispatcher-4] [akka://test-actor-system/user/sink-actor] SinkActor stopped.
[INFO] [04/03/2019 18:40:51.251] [test-actor-system-akka.actor.default-dispatcher-3] [akka://test-actor-system/system/kafka-consumer-1] Message [akka.kafka.KafkaConsumerActor$Stop$] from Actor[akka://test-actor-system/system/StreamSupervisor-0/$$b#-752541566] to Actor[akka://test-actor-system/system/kafka-consumer-1#-843673753] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://test-actor-system/system/kafka-consumer-1#-843673753]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

I am assuming that the stop-timeout is a parameter just in case the consumer actor is not stopped properly, and not the parameter we should be use to stop the stream. Is that correct?

@ennru
Copy link
Member

ennru commented Apr 3, 2019

No, stop-timeout applies even for regular shut down.
The dead letter is expected, we might want to remove the second send when we know it was sent already.

@ennru
Copy link
Member

ennru commented Nov 28, 2019

I'll close this now. The shutdown process has been reworked quite a bit since this was reported.
I acknowledge that people tend to stumble over the stop-timeout which delays the shutdown a bit too much in most cases. But that is a different issue.

Open a new issue if you find shutdown behaviour that is broken.

@ennru ennru closed this as completed Nov 28, 2019
@ennru ennru added this to the invalid/not release-bound milestone Dec 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests