Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2073: Do not commit the task offsets when shutting down the SamzaContainer. #884

Closed

Conversation

shanthoosh
Copy link
Contributor

@shanthoosh shanthoosh commented Jan 16, 2019

SAMZA-1489 added support for committing the offsets of all the tasks when shutting down a SamzaContainer. Other components in samza such as a CheckpointListener's aren't developed to account for possibility of commit after the consumers are stopped.

This unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:

apache.samza.container.SamzaContainer@129d533c to shutdown.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
java.lang.IllegalStateException: This consumer has already been closed.
 at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
 at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)

Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence.

@shanthoosh
Copy link
Contributor Author

@prateekm Can you take a look since you have more context.

@shanthoosh shanthoosh force-pushed the remove_task_commit_during_shutdown branch from 5cb3faf to 3f37ecd Compare January 16, 2019 02:37
Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion. Fix it and ship it.

@shanthoosh
Copy link
Contributor Author

@prateekm Thanks for the quick review.

@asfgit asfgit closed this in 5ff7f23 Jan 16, 2019
Zhangyx39 pushed a commit to Zhangyx39/samza that referenced this pull request Apr 3, 2019
…zaContainer.

SAMZA-1489 added support for committing the offsets of all the  tasks when shutting down a SamzaContainer. Other components in samza such as a CheckpointListener's  aren't developed to account for possibility of commit after the consumers are stopped.

This  unnecessarily results in a unclean shutdown of a samza standalone processor during the rebalancing phase. Here're the sample logs:
```
apache.samza.container.SamzaContainer129d533c to shutdown.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down SamzaContainer.
2019/01/15 02:38:09.738 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down consumer multiplexer.
2019/01/15 02:38:09.741 INFO [KafkaProducer] [hello-brooklin-task-i001-auditor] [hello-brooklin-task] [] [Producer clientId=hello-brooklin-task-i001-auditor, transactionalId=nullClosing the Kafka producer with timeoutMillis = 9223370489334886066 ms.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down task instance stream tasks.
2019/01/15 02:38:09.748 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Shutting down timer executor
2019/01/15 02:38:09.749 INFO [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Committing offsets for all task instances
2019/01/15 02:38:09.753 ERROR [SamzaContainer] [Samza StreamProcessor Container Thread-0] [hello-brooklin-task] [] Caught exception/error while shutting down container.
java.lang.IllegalStateException: This consumer has already been closed.
 at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1735)
 at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1214)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitOffsets(LiKafkaConsumerImpl.java:311)
 at com.linkedin.kafka.liclients.consumer.LiKafkaConsumerImpl.commitSync(LiKafkaConsumerImpl.java:282)
 at com.linkedin.brooklin.client.BaseConsumerImpl.commit(BaseConsumerImpl.java:136)
```

Since the final commit is not critical, it will be better to not do it as a part of the SamzaContainer shutdown sequence.

Author: Shanthoosh Venkataraman <spvenkat@usc.edu>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes apache#884 from shanthoosh/remove_task_commit_during_shutdown
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants