Skip to content

Commit

Permalink
Handle No Lease, and Lease Loss in requestShutdown. (#139)
Browse files Browse the repository at this point in the history
* Handle No Lease, and Lease Loss in requestShutdown.

Ensure that the Worker is shutdown if there are no leases.  Handle the
case where a lease is lost during the notification process.

* Add some more comments around possible race conditions.
  • Loading branch information
pfifer committed Feb 17, 2017
1 parent bf71524 commit 31615a5
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ static class ShutdownNotificationState implements ConsumerState {
@Override
public ITask createTask(ShardConsumer consumer) {
return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(),
consumer.getShutdownNotification());
consumer.getShutdownNotification(), consumer.getShardInfo());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ class ShutdownNotificationTask implements ITask {
private final IRecordProcessor recordProcessor;
private final IRecordProcessorCheckpointer recordProcessorCheckpointer;
private final ShutdownNotification shutdownNotification;
private final ShardInfo shardInfo;

ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification) {
ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) {
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
this.shutdownNotification = shutdownNotification;
this.shardInfo = shardInfo;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,15 +538,18 @@ private List<ShardInfo> getShardInfoForAssignments() {
*/
public Future<Void> requestShutdown() {

leaseCoordinator.stopLeaseTaker();
//
// Stop accepting new leases
// Stop accepting new leases. Once we do this we can be sure that
// no more leases will be acquired.
//
leaseCoordinator.stopLeaseTaker();

Collection<KinesisClientLease> leases = leaseCoordinator.getAssignments();
if (leases == null || leases.isEmpty()) {
//
// If there are no leases shutdown is already completed.
// If there are no leases notification is already completed, but we still need to shutdown the worker.
//
this.shutdown();
return Futures.immediateFuture(null);
}
CountDownLatch shutdownCompleteLatch = new CountDownLatch(leases.size());
Expand All @@ -555,7 +558,18 @@ public Future<Void> requestShutdown() {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease,
notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease);
shardInfoShardConsumerMap.get(shardInfo).notifyShutdownRequested(shutdownNotification);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.notifyShutdownRequested(shutdownNotification);
} else {
//
// There is a race condition between retrieving the current assignments, and creating the
// notification. If the a lease is lost in between these two points, we explicitly decrement the
// notification latches to clear the shutdown.
//
notificationCompleteLatch.countDown();
shutdownCompleteLatch.countDown();
}
}

return new ShutdownFuture(shutdownCompleteLatch, notificationCompleteLatch, this);
Expand Down Expand Up @@ -622,9 +636,11 @@ private void finalShutdown() {
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
*
* @return Whether worker should shutdown immediately.
*/
private boolean shouldShutdown() {
@VisibleForTesting
boolean shouldShutdown() {
if (executorService.isShutdown()) {
LOG.error("Worker executor service has been shutdown, so record processors cannot be shutdown.");
return true;
Expand Down

0 comments on commit 31615a5

Please sign in to comment.