Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stopping tomcat container does not kill kinesis consumer connection #167

Closed
prayagupa opened this issue May 9, 2017 · 5 comments
Closed

Comments

@prayagupa
Copy link

prayagupa commented May 9, 2017

I have stream consumer app running inside tomcat container.

When I stop the tomcat container, it is supposed to stop the kinesis-stream consumer-app resources as well, but it does not,

Even after /usr/local/apache-tomcat-8.0.42/bin/shutdown.sh, I see tomcat not properly stopped.

[ec2-user@ip-172-21-5-105 ~]$ ps aux | grep tomcat
root     13904  0.3 10.5 4611864 816080 ?      Sl   May08   1:55 /usr/java/jdk1.8.0_60//bin/java -Djava.util.logging.config.file=/usr/local/apache-tomcat-8.5.12/conf/logging.properties -Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager -Djdk.tls.ephemeralDHKeySize=2048 -Djava.protocol.handler.pkgs=org.apache.catalina.webresources -classpath /usr/local/apache-tomcat-8.5.12/bin/bootstrap.jar:/usr/local/apache-tomcat-8.5.12/bin/tomcat-juli.jar -Dcatalina.base=/usr/local/apache-tomcat-8.5.12 -Dcatalina.home=/usr/local/apache-tomcat-8.5.12 -Djava.io.tmpdir=/usr/local/apache-tomcat-8.5.12/temp org.apache.catalina.startup.Bootstrap start
ec2-user 15408  0.0  0.0 112648   964 pts/0    S+   00:14   0:00 grep --color=auto tomcat

Which lets the consumer keep running as I can see the consumerLeaseCounter keep increasing,

{
  "checkpoint": "49572589993860134938465509123149151340420296711845445634",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 394714,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "customerorder-e2e_172.21.5.105",
  "ownerSwitchesSinceCheckpoint": 0
}
{
  "checkpoint": "49572589993860134938465509123149151340420296711845445634",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 394720,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "customerorder-e2e_172.21.5.105",
  "ownerSwitchesSinceCheckpoint": 0
}

I see the the error stopping the consumer resources,

On nativeWorker.requestShutdown();. I'm wrapping nativeWorker around my thread called consumerThread = new Thread(nativeWorker);

{
  "timeMillis": 1494303061730,
  "thread": "localhost-startStop-2",
  "level": "INFO",
  "loggerName": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
  "message": "Shutting down consumer instance customerorder-e2e_172.21.5.105",
  "endOfBatch": false,
  "loggerFqcn": "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId": 177,
  "threadPriority": 5
}
{
  "timeMillis": 1494303061732,
  "thread": "Thread-8",
  "level": "ERROR",
  "loggerName": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
  "message": "Error starting a consumer customerorder-e2e",
  "thrown": {
    "commonElementCount": 0,
    "name": "java.lang.ThreadDeath",
    "extendedStackTrace": [
      {
        "class": "java.lang.Thread",
        "method": "stop",
        "file": "Thread.java",
        "line": 850,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
        "method": "shutdown",
        "file": "KinesisEventStreamConsumer.java",
        "line": 202,
        "exact": false,
        "location": "stream-driver-1.0-SNAPSHOT.jar",
        "version": "?"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke0",
        "file": "NativeMethodAccessorImpl.java",
        "line": -2,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "sun.reflect.NativeMethodAccessorImpl",
        "method": "invoke",
        "file": "NativeMethodAccessorImpl.java",
        "line": 62,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "sun.reflect.DelegatingMethodAccessorImpl",
        "method": "invoke",
        "file": "DelegatingMethodAccessorImpl.java",
        "line": 43,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.lang.reflect.Method",
        "method": "invoke",
        "file": "Method.java",
        "line": 497,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "org.springframework.beans.factory.support.DisposableBeanAdapter",
        "method": "invokeCustomDestroyMethod",
        "file": "DisposableBeanAdapter.java",
        "line": 364,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DisposableBeanAdapter",
        "method": "destroy",
        "file": "DisposableBeanAdapter.java",
        "line": 287,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroyBean",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 578,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroySingleton",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 554,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultListableBeanFactory",
        "method": "destroySingleton",
        "file": "DefaultListableBeanFactory.java",
        "line": 961,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultSingletonBeanRegistry",
        "method": "destroySingletons",
        "file": "DefaultSingletonBeanRegistry.java",
        "line": 523,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.beans.factory.support.DefaultListableBeanFactory",
        "method": "destroySingletons",
        "file": "DefaultListableBeanFactory.java",
        "line": 968,
        "exact": false,
        "location": "spring-beans-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "destroyBeans",
        "file": "AbstractApplicationContext.java",
        "line": 1033,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "doClose",
        "file": "AbstractApplicationContext.java",
        "line": 1009,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.context.support.AbstractApplicationContext",
        "method": "close",
        "file": "AbstractApplicationContext.java",
        "line": 961,
        "exact": false,
        "location": "spring-context-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.web.context.ContextLoader",
        "method": "closeWebApplicationContext",
        "file": "ContextLoader.java",
        "line": 581,
        "exact": false,
        "location": "spring-web-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.springframework.web.context.ContextLoaderListener",
        "method": "contextDestroyed",
        "file": "ContextLoaderListener.java",
        "line": 116,
        "exact": false,
        "location": "spring-web-4.3.6.RELEASE.jar",
        "version": "4.3.6.RELEASE"
      },
      {
        "class": "org.apache.catalina.core.StandardContext",
        "method": "listenerStop",
        "file": "StandardContext.java",
        "line": 4799,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.StandardContext",
        "method": "stopInternal",
        "file": "StandardContext.java",
        "line": 5438,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.util.LifecycleBase",
        "method": "stop",
        "file": "LifecycleBase.java",
        "line": 226,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.ContainerBase$StopChild",
        "method": "call",
        "file": "ContainerBase.java",
        "line": 1435,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "org.apache.catalina.core.ContainerBase$StopChild",
        "method": "call",
        "file": "ContainerBase.java",
        "line": 1424,
        "exact": false,
        "location": "catalina.jar",
        "version": "8.5.12"
      },
      {
        "class": "java.util.concurrent.FutureTask",
        "method": "run",
        "file": "FutureTask.java",
        "line": 266,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1142,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 617,
        "exact": false,
        "location": "?",
        "version": "1.8.0_60"
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 745,
        "exact": true,
        "location": "?",
        "version": "1.8.0_60"
      }
    ]
  },
  "endOfBatch": false,
  "loggerFqcn": "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId": 36,
  "threadPriority": 5
}

I'm wrapping the nativeWorker in my StreamConsumer#consumeOnce - https://github.com/nihil-os/stream-driver/blob/kinesis-stream/src/main/java/com/eventstream/consumer/kinesis/KinesisEventStreamConsumer.java#L102

The way I'm making it work for now is using unix kill to stop the tomcat process.

@pfifer
Copy link
Contributor

pfifer commented May 16, 2017

The KCL isn't really designed to be managed by a container. I suspect the problem might be related to KinesisEventStreamConsumer.java:203, since the method returns a future that you're not waiting on.

@prayagupa
Copy link
Author

prayagupa commented May 16, 2017

@pfifer you are right. I will definitely see waiting on nativeWorker.requestShutdown.

might look like

            while(!nativeConsumer.requestShutdown().isDone()) {}
            consumerThread.stop();

Thanks for your input.

@pfifer pfifer added the bug label May 16, 2017
@pfifer
Copy link
Contributor

pfifer commented May 16, 2017

This is a bug as you shouldn't have to request the value of a future to trigger the completion of it. Thanks for reporting it.

For the time being you can do something like:

nativeConsumer.requestShutdown().get()

or if you don't want to wait forever do

nativeConsumer.requestShutdown().get(5, TimeUnit.SECONDS)

with whatever timeout you want to use. You will need to handle the exceptions these methods can throw though.

@prayagupa
Copy link
Author

com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker has

    public Future<Void> requestShutdown() {}
    public void shutdown() {}

I believe requestShutdown was designed for graceful shutdwown, as I see shutdownCompleteLatch being used in implementation.

But even if I wait on requestShutdown().get() before stopping my wrapper thread on com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker, I'm getting java.lang.ThreadDeath error. I need to dig more into java thread. (error is on wrapperConsumerThread#stop())

{
  "commonElementCount": 0,
  "name": "java.lang.ThreadDeath",
  "extendedStackTrace": [
    {
      "class": "java.lang.Thread",
      "method": "stop",
      "file": "Thread.java",
      "line": 850,
      "exact": true,
      "location": "?",
      "version": "1.8.0_111"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumer",
      "method": "shutdown",
      "file": "KinesisEventStreamConsumer.java",
      "line": 210,
      "exact": false,
      "location": "classes/",
      "version": "?"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumerComponentSpecs$$anonfun$1$$anonfun$apply$mcV$sp$2",
      "method": "apply",
      "file": "KinesisEventStreamConsumerComponentSpecs.scala",
      "line": 176,
      "exact": false,
      "location": "test-classes/",
      "version": "?"
    },
    {
      "class": "com.eventstream.consumer.kinesis.KinesisEventStreamConsumerComponentSpecs$$anonfun$1$$anonfun$apply$mcV$sp$2",
      "method": "apply",
      "file": "KinesisEventStreamConsumerComponentSpecs.scala",
      "line": 129,
      "exact": false,
      "location": "test-classes/",
      "version": "?"
    }
  ]
}

I get same ThreadDeath error alternatively(not always) on following pretty simple thread, and the thread still counts as alive.

    val processor = new Thread()

    processor.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
      override def uncaughtException(t: Thread, e: Throwable): Unit = e.printStackTrace()
    })

    println(processor.isAlive)
    processor.start()

    processor.stop()
    println(processor.isAlive)

@prayagupa
Copy link
Author

Nevermind, I should not be trying to stop consumerThread(bad practise) as worker has already been handling shutdown. So, I am simply doing requestShutdown on the kinesis worker and waiting on it.

nativeConsumer.requestShutdown().get(5, TimeUnit.SECONDS)

That way my container would be fine as well.

@pfifer pfifer added this to the v1.8.0 milestone Jun 6, 2017
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Jun 12, 2017
pfifer added a commit that referenced this issue Jul 21, 2017
* Initial start of fix for requested shutdown

* Execute the requested shutdown in a separate thread.

Fix for Issue #167

* Reworked some of the shutdown logic to make the relationships clearer.

* Added/Updated Copyright Statements

* Add Missing License Statements
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Jul 25, 2017
* Execute graceful shutdown on its own thread
  * PR awslabs#191
  * Issue awslabs#167
* Added support for controlling the size of the lease renewer thread pool
  * PR awslabs#177
  * Issue awslabs#171
* Require Java 8 and later
  Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.
  * PR awslabs#176
@pfifer pfifer mentioned this issue Jul 25, 2017
pfifer added a commit that referenced this issue Jul 25, 2017
* Execute graceful shutdown on its own thread
  * PR #191
  * Issue #167
* Added support for controlling the size of the lease renewer thread pool
  * PR #177
  * Issue #171
* Require Java 8 and later
  Java 8 is now required for versions 1.8.0 of the amazon-kinesis-client and later.
  * PR #176
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants