Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.NullPointerException at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.requestShutdown(Worker.java:558) #129

Closed
slam opened this issue Jan 18, 2017 · 3 comments
Labels
Milestone

Comments

@slam
Copy link
Contributor

slam commented Jan 18, 2017

I upgraded KCL to 1.7.2 and modified my code to use Worker.requestShutdown() to try to do a clean shutdown.

With the change my integration test now crashes with:

java.lang.NullPointerException
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.requestShutdown(Worker.java:558)
	at com.zynga.analytics.KinesisExecutor.shutdown(KinesisExecutor.java:50)
	at com.zynga.analytics.kinesis.s3.S3ConsumerApplicationIntegrationTest.tearDown(S3ConsumerApplicationIntegrationTest.java:132)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
	at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
	at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

A breakpoint right before the crash shows that there were 3 leases but only 1 item in the shardInfoShardConsumerMap:

leases = {HashMap$Values@6053}  size = 3
 0 = {KinesisClientLease@6056} "{\n  "leaseKey" : "shardId-000000000012",\n  "leaseOwner" : "20958db3cafdc283:c9f780a:159af1ab926:-8000",\n  "leaseCounter" : 25,\n  "concurrencyToken" : "ba712794-ab53-4f63-92af-7ab40c3c15af",\n  "lastCounterIncrementNanos" : 383997591929910,\n  "checkpoint" : {\n    "sequenceNumber" : "TRIM_HORIZON",\n    "subSequenceNumber" : 0\n  },\n  "ownerSwitchesSinceCheckpoint" : 0,\n  "parentShardIds" : [ "shardId-000000000011", "shardId-000000000010" ]\n}"
 1 = {KinesisClientLease@6076} "{\n  "leaseKey" : "shardId-000000000011",\n  "leaseOwner" : "20958db3cafdc283:c9f780a:159af1ab926:-8000",\n  "leaseCounter" : 38,\n  "concurrencyToken" : "dbb51590-f3f7-4dd8-a7a9-ce3f9b1a3192",\n  "lastCounterIncrementNanos" : 383997588740565,\n  "checkpoint" : {\n    "sequenceNumber" : "SHARD_END",\n    "subSequenceNumber" : 0\n  },\n  "ownerSwitchesSinceCheckpoint" : 0,\n  "parentShardIds" : [ "shardId-000000000009", "shardId-000000000007" ]\n}"
 2 = {KinesisClientLease@6077} "{\n  "leaseKey" : "shardId-000000000010",\n  "leaseOwner" : "20958db3cafdc283:c9f780a:159af1ab926:-8000",\n  "leaseCounter" : 39,\n  "concurrencyToken" : "551edf69-75c9-4487-aa41-787d66ff9793",\n  "lastCounterIncrementNanos" : 383997592292027,\n  "checkpoint" : {\n    "sequenceNumber" : "SHARD_END",\n    "subSequenceNumber" : 0\n  },\n  "ownerSwitchesSinceCheckpoint" : 0,\n  "parentShardIds" : [ "shardId-000000000008" ]\n}"
shardInfoShardConsumerMap = {ConcurrentHashMap@6059}  size = 1
 0 = {ConcurrentHashMap$MapEntry@6088} "ShardInfo [shardId=shardId-000000000012, concurrencyToken=ba712794-ab53-4f63-92af-7ab40c3c15af, parentShardIds=[shardId-000000000010, shardId-000000000011], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]" -> 
  key = {ShardInfo@6089} "ShardInfo [shardId=shardId-000000000012, concurrencyToken=ba712794-ab53-4f63-92af-7ab40c3c15af, parentShardIds=[shardId-000000000010, shardId-000000000011], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]"
  value = {ShardConsumer@6090} 

I could only reproduce this with my integration test. The test up- and down-shards the stream continuously while verifying the data going through the stream.

Interesting thing to note: The two shards that were in the leases but not in shardInfoShardConsumerMap had a sequenceNumber of SHARD_END. I suspect that the sharding activity might have something to do with the crash...

@slam
Copy link
Contributor Author

slam commented Jan 18, 2017

My attempt at fixing the NPE. It is probably not tackling the root cause.

diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 2a1e548..e825d53 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -555,7 +555,10 @@ public class Worker implements Runnable {
             ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
                     notificationCompleteLatch, shutdownCompleteLatch);
             ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
-            shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
+            ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
+            if (consumer != null) {
+                consumer.notifyShutdownRequested(shutdownNotification);
+            }
         }
 
         return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);

@pfifer
Copy link
Contributor

pfifer commented Jan 18, 2017

Thanks for reporting this. I'll look into getting a fix out in the ear future.

@pfifer pfifer added the bug label Jan 18, 2017
@pfifer pfifer added this to the Release 1.7.4 milestone Jan 23, 2017
pfifer added a commit to pfifer/amazon-kinesis-client that referenced this issue Feb 27, 2017
* Fixed an issue building JavaDoc for Java 8.
  * [Issue awslabs#18](awslabs#18)
  * [PR awslabs#141](awslabs#141)
* Reduce Throttling Messages to WARN, unless throttling occurs 6 times consecutively.
  * [Issue awslabs#4](awslabs#4)
  * [PR awslabs#140](awslabs#140)
* Fixed two bugs occurring in requestShutdown.
  * Fixed a bug that prevented the worker from shutting down, via requestShutdown, when no leases were held.
    * [Issue awslabs#128](awslabs#128)
  * Fixed a bug that could trigger a NullPointerException if leases changed during requestShutdown.
    * [Issue awslabs#129](awslabs#129)
  * [PR awslabs#139](awslabs#139)
* Upgraded the AWS SDK Version to 1.11.91
  * [PR awslabs#138](awslabs#138)
* Use an executor returned from `ExecutorService.newFixedThreadPool` instead of constructing it by hand.
  * [PR awslabs#135](awslabs#135)
* Correctly initialize DynamoDB client, when endpoint is explicitly set.
  * [PR awslabs#142](awslabs#142)
pfifer added a commit that referenced this issue Feb 27, 2017
* Fixed an issue building JavaDoc for Java 8.
  * [Issue #18](#18)
  * [PR #141](#141)
* Reduce Throttling Messages to WARN, unless throttling occurs 6 times consecutively.
  * [Issue #4](#4)
  * [PR #140](#140)
* Fixed two bugs occurring in requestShutdown.
  * Fixed a bug that prevented the worker from shutting down, via requestShutdown, when no leases were held.
    * [Issue #128](#128)
  * Fixed a bug that could trigger a NullPointerException if leases changed during requestShutdown.
    * [Issue #129](#129)
  * [PR #139](#139)
* Upgraded the AWS SDK Version to 1.11.91
  * [PR #138](#138)
* Use an executor returned from `ExecutorService.newFixedThreadPool` instead of constructing it by hand.
  * [PR #135](#135)
* Correctly initialize DynamoDB client, when endpoint is explicitly set.
  * [PR #142](#142)
@pfifer
Copy link
Contributor

pfifer commented Feb 27, 2017

This is fixed in release 1.7.4. Please reopen if you're still seeing this issue.

@pfifer pfifer closed this as completed Feb 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants