Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standalone SPL apps using Kafka consumer op with java8 don't terminate #117

Closed
dlaboss opened this issue Aug 12, 2015 · 7 comments
Closed
Labels

Comments

@dlaboss
Copy link
Member

dlaboss commented Aug 12, 2015

When running a standalone SPL app that uses the kafka operators with java8,
^C of the standalone app results in the standalone process being orphaned - it doesn't terminate.

Furthermore, it continues to be the lead consumer for the group/topic, thereby
preventing subsequent consumer instances from receiving any msgs.

All is fine with java7.

To reproduce using the KafkaSample in this repository:

cd KafkaSample
export STREAMS_INSTALL=.../InfoSphere_Streams/4.0.1.0  # use 4.0.1 which has java8

demo good case w/java7

export JAVA_HOME= /opt/ibm/java-x86_64-71    # or your path to a java7 
make standalone
$JAVA_HOME/bin/java -jar output/com.ibm.streamsx.messaging.sample.kafka.KafkaSample.sab
^C
# verify standalone terminates
ps -fu$USER -www | grep standalone 

demo failure case - use java8 (present in 4.0.1 install)

(unset JAVA_HOME; $STREAMS_INSTALL/bin/sc -a -t ../../com.ibm.streamsx.messaging:$STREAMS_INSTALL/toolkits/com.ibm.streamsx.messaging -T -M com.ibm.streamsx.messaging.sample.kafka::KafkaSample  --output-dir ./NO_JAVA_HOME)
$JAVA_HOME/bin/java -jar NO_JAVA_HOME/com.ibm.streamsx.messaging.sample.kafka.KafkaSample.sab
^C
# if you were to rerun the java7 (or 8) .sab at this point, it will not report any msgs because...
# show standalone DOES NOT terminates
ps -fu$USER -www | grep standalone 
kill -9 ...  # the standalone process

The standalone process receives the SIGINT and initiates shutdown processing
but it never completes. If you run with tracing (-t 4) Kafka reports it
is "Stopping all fetchers" but it doesn't report "All connections stopped".

Hence it appears that the kafka consumer op's thread-factory created
thread doesn't return from the KafkaStream.iterator().hasNext(),
hence that thread factory created thread never completes,
hence the consumer op never completes its showdown processing,
hence the "pe" never completes -- "Exit PEImpl::shutdown()" doesn't occur,
hence the process never terminates.

@chanskw chanskw added the kafka label Aug 12, 2015
@dlaboss
Copy link
Member Author

dlaboss commented Aug 14, 2015

also, canceljob does not leave residual processes but it is very slow w/4.0.1/java8. i suspect that op shutdown is ill-behaving just as described for the standalone case but eventually the hc (or pec) times out awaiting shutdown and then forceably kills the process.

@Alex-Cook4
Copy link
Collaborator

I have made sure I'm sending thread interrupts to the consumer op's thread, but that didn't fix the problem. I'm continuing to try more things, but if you have any ideas that would be great!

@dlaboss
Copy link
Member Author

dlaboss commented Aug 27, 2015

maybe get a java coredump of a stranded standalone process to definitively identify what threads are still hanging around. if it's a kafka consumer thread as suspected, then exactly what it's blocked in and go from there. e.g., is it a nio method... whose doc indicates it should terminate with the interrupt? and/or if there's rumblings about java7/8 diffs there?

@Alex-Cook4
Copy link
Collaborator

I've also tried thread.destroy() just to try, but also with no luck. I got a coredump, but I don't really know what I'm looking for (I have never analyzed this kind of coredump before). I think I will need help figuring this one out.

@Alex-Cook4
Copy link
Collaborator

@dlaboss I upgraded my kafka servers to kafka_2.10-0.8.2.0. This no longer happens. My Streams jobs terminate much more quickly and my standalone applications terminate just fine. I wasn't able to solve this problem with the older servers. Do you think that upgrading to newer servers is a reasonable solution to this issue?

^CThe SIGINT signal occurred during the application execution. The application is shutting down.
21 Sep 2015 14:22:09.016 [5183] INFO spl_pe M[PEImpl.cpp:shutdown:1084]  - Shutdown request received by PE...
21 Sep 2015 14:22:09.016 [5183] INFO spl_pe M[PEImpl.cpp:shutdown:1092]  - isJobCancellation is set to 0
21 Sep 2015 14:22:09.017 [5183] INFO spl_pe M[PEImpl.cpp:shutdown:1099]  - shutdownRequested set to true...
21 Sep 2015 14:22:09.017 [5183] INFO spl_pe M[PEImpl.cpp:prepareOperatorsForTermination:981]  - Notifying operators of termination...
21 Sep 2015 14:22:09.018 [5183] INFO #splapptrc,J[0],P[0],KafkaProducer_5,spl_javaop M[?:com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.shutdown:-1]  - Operator.shutdown() : com.ibm.streamsx.messaging.kafka.KafkaSink
21 Sep 2015 14:22:09.020 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3,spl_javaop M[?:com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.shutdown:-1]  - Operator.shutdown() : com.ibm.streamsx.messaging.kafka.KafkaSource
21 Sep 2015 14:22:09.022 [5154] INFO spl_pe M[PEImpl.cpp:joinOperatorThreads:994]  - Joined all operator threads...
21 Sep 2015 14:22:09.022 [5154] INFO spl_pe M[PEImpl.cpp:joinWindowThreads:1008]  - Joining window threads...
21 Sep 2015 14:22:09.023 [5154] INFO spl_pe M[PEImpl.cpp:joinWindowThreads:1012]  - Joined all window threads.
21 Sep 2015 14:22:09.023 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e], ZKConsumerConnector shutting down
21 Sep 2015 14:22:09.023 [5154] INFO spl_pe M[PEImpl.cpp:joinActiveQueues:999]  - Joining active queues...
21 Sep 2015 14:22:09.024 [5154] INFO spl_pe M[PEImpl.cpp:joinActiveQueues:1003]  - Joined active queues.
21 Sep 2015 14:22:09.024 [5154] INFO spl_pe M[PECleaners.cpp:finalizeImpl:116]  - Closing ports...
21 Sep 2015 14:22:09.024 [5198] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[?:com.ibm.streamsx.messaging.kafka.KafkaClient$KafkaConsumer.run:-1]  -  Topic[millionTupleTopic3] Thread[0] Thread Stopping
21 Sep 2015 14:22:09.025 [5154] INFO spl_pe M[PECleaners.cpp:finalizeImpl:118]  - Closed all ports...
[14:22]cooka@g0601b01(48):KafkaSpeedTest1[130]$ 21 Sep 2015 14:22:09.061 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherManager-1442859710977] Stopping leader finder thread
21 Sep 2015 14:22:09.062 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-leader-finder-thread], Shutting down
21 Sep 2015 14:22:09.063 [5197] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-leader-finder-thread], Stopped 
21 Sep 2015 14:22:09.064 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-leader-finder-thread], Shutdown completed
21 Sep 2015 14:22:09.064 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherManager-1442859710977] Stopping all fetchers
21 Sep 2015 14:22:09.065 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-2], Shutting down
21 Sep 2015 14:22:09.066 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-2], Shutdown completed
21 Sep 2015 14:22:09.067 [5200] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-2], Stopped 
21 Sep 2015 14:22:09.067 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-1], Shutting down
21 Sep 2015 14:22:09.068 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-1], Shutdown completed
21 Sep 2015 14:22:09.069 [5201] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-1], Stopped 
21 Sep 2015 14:22:09.070 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-0], Shutting down
21 Sep 2015 14:22:09.071 [5202] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-0], Stopped 
21 Sep 2015 14:22:09.072 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherThread-mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e-0-0], Shutdown completed
21 Sep 2015 14:22:09.072 [5196] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e], stopping watcher executor thread for consumer mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e
21 Sep 2015 14:22:09.073 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [ConsumerFetcherManager-1442859710977] All connections stopped
21 Sep 2015 14:22:09.081 [5188] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[ZkEventThread.java:org.I0Itec.zkclient.ZkEventThread.run:82]  - Terminate ZkClient event thread.
21 Sep 2015 14:22:09.084 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[ZooKeeper.java:org.apache.zookeeper.ZooKeeper.close:684]  - Session: 0x24fb3026acf1600 closed
21 Sep 2015 14:22:09.085 [5190] INFO #splapptrc,J[0],P[0],KafkaProducer_5 M[ClientCnxn.java:org.apache.zookeeper.ClientCnxn$EventThread.run:512]  - EventThread shut down
21 Sep 2015 14:22:09.086 [5183] INFO #splapptrc,J[0],P[0],KafkaConsumer_3 M[Logging.scala:kafka.utils.Logging$class.info:68]  - [mygroup4_g0601b01.pok.hpc-ng.ibm.com-1442859710893-9b18f89e], ZKConsumerConnector shutdown completed in 61 ms
21 Sep 2015 14:22:09.086 [5183] INFO spl_pe M[PEImpl.cpp:prepareOperatorsForTermination:985]  - Notified all operators of termination...
21 Sep 2015 14:22:09.087 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1017]  - Flushing operator profile metrics...
21 Sep 2015 14:22:09.087 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1021]  - Flushed all operator profile metrics...
21 Sep 2015 14:22:09.087 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1023]  - Deleting active queues...
21 Sep 2015 14:22:09.088 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1027]  - Deleted active queues.
21 Sep 2015 14:22:09.088 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1029]  - Deleting input port tuple cache...
21 Sep 2015 14:22:09.088 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1033]  - Deleted input port tuple cache.
21 Sep 2015 14:22:09.089 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1038]  - Deleting all operators...
21 Sep 2015 14:22:09.089 [5154] INFO spl_pe M[PEImpl.cpp:deleteOperators:1045]  - Deleted all operators.
21 Sep 2015 14:22:09.089 [5154] INFO spl_pe M[PEImpl.cpp:process:875]  - Terminating...
21 Sep 2015 14:22:09.089 [5154] INFO spl_app M[StandaloneApplicationImpl.cpp:run:201]  - Completed the standalone application processing
21 Sep 2015 14:22:09.090 [5154] INFO spl_app M[StandaloneApplicationImpl.cpp:run:203]  - Leaving MyApplication::run()
21 Sep 2015 14:22:09.104 [5183] INFO spl_pe M[PEImpl.cpp:shutdown:1111]  - Exit PEImpl::shutdown()

@dlaboss
Copy link
Member Author

dlaboss commented Sep 21, 2015

@Alex-Cook4 sounds reasonable to me. Does #118 also go away with the new kafka?

@Alex-Cook4
Copy link
Collaborator

@dlaboss Unfortunately #118 does not go away with the new servers. It actually seems worse and consistently fails with "failed to rebalance errors" that looks the same as #125.

The work-around for #118 is still to make sure you have different group id's.

kafkaProperty : "group.id=myOtherGroupName";

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants