Skip to content

Commit

Permalink
Ensure ShardSyncTask invocation from ShardSyncTaskManager for pending…
Browse files Browse the repository at this point in the history
… shard sync requests
  • Loading branch information
parijatsinha committed Dec 3, 2019
1 parent 635a101 commit 2d6b92e
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

import com.amazonaws.services.kinesis.model.Shard;
import lombok.Getter;
Expand All @@ -35,13 +37,12 @@
* Kinesis shards, remove obsolete leases). We'll have at most one outstanding sync task at any time.
* Worker will use this class to kick off a sync task when it finds shards which have been completely processed.
*/
@Getter
class ShardSyncTaskManager {
@Getter class ShardSyncTaskManager {

private static final Log LOG = LogFactory.getLog(ShardSyncTaskManager.class);

private ITask currentTask;
private Future<TaskResult> future;
private CompletableFuture<TaskResult> future;
private final IKinesisProxy kinesisProxy;
private final ILeaseManager<KinesisClientLease> leaseManager;
private final IMetricsFactory metricsFactory;
Expand All @@ -51,30 +52,28 @@ class ShardSyncTaskManager {
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;
private final ShardSyncer shardSyncer;
private final ReentrantLock lock;

private AtomicBoolean shardSyncRequestPending;

/**
* Constructor.
*
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
* @param kinesisProxy Proxy used to fetch streamInfo (shards)
* @param leaseManager Lease manager (used to list and create leases for shards)
* @param initialPositionInStream Initial position in stream
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
* @param shardSyncer shardSyncer instance used to check and create new leases
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
ShardSyncTaskManager(final IKinesisProxy kinesisProxy,
final ILeaseManager<KinesisClientLease> leaseManager,
ShardSyncTaskManager(final IKinesisProxy kinesisProxy, final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis, final IMetricsFactory metricsFactory, ExecutorService executorService,
ShardSyncer shardSyncer) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
Expand All @@ -85,13 +84,20 @@ class ShardSyncTaskManager {
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
this.shardSyncer = shardSyncer;
this.shardSyncRequestPending = new AtomicBoolean(false);
this.lock = new ReentrantLock(Boolean.TRUE);
}

synchronized Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
return checkAndSubmitNextTask(latestShards);
Future<TaskResult> syncShardAndLeaseInfo(List<Shard> latestShards) {
try {
lock.lock();
return checkAndSubmitNextTask(latestShards);
} finally {
lock.unlock();
}
}

private synchronized Future<TaskResult> checkAndSubmitNextTask(List<Shard> latestShards) {
private Future<TaskResult> checkAndSubmitNextTask(List<Shard> latestShards) {
Future<TaskResult> submittedTaskFuture = null;
if ((future == null) || future.isCancelled() || future.isDone()) {
if ((future != null) && future.isDone()) {
Expand All @@ -106,24 +112,46 @@ private synchronized Future<TaskResult> checkAndSubmitNextTask(List<Shard> lates
}
}

currentTask =
new MetricsCollectingTaskDecorator(new ShardSyncTask(kinesisProxy,
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
currentTask = new MetricsCollectingTaskDecorator(
new ShardSyncTask(kinesisProxy, leaseManager, initialPositionInStream,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis,
shardSyncer, latestShards), metricsFactory);
future = executorService.submit(currentTask);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> {
if (exception != null) {
LOG.error("Caught exception running " + currentTask.getTaskType() + " task: ", exception);
}
// Acquire lock here. If shardSyncRequestPending is false in this completionStage and
// syncShardAndLeaseInfo is invoked, before completion stage exits (future completes)
// but right after the value of shardSyncRequestPending is checked, it will result in
// shardSyncRequestPending being set to true, but no pending futures to trigger the next
// ShardSyncTask. By executing this stage in a Reentrant lock, we ensure that if the
// previous task is in this completion stage, checkAndSubmitNextTask is not invoked
// until this completionStage exits.
try {
lock.lock();
if (shardSyncRequestPending.get()) {
shardSyncRequestPending.set(false);
// reset future to null, so next call creates a new one
// without trying to get results from the old future.
future = null;
checkAndSubmitNextTask(latestShards);
}
} finally {
lock.unlock();
}
});
if (LOG.isDebugEnabled()) {
LOG.debug("Submitted new " + currentTask.getTaskType() + " task.");
}
submittedTaskFuture = future;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task.");
LOG.debug("Previous " + currentTask.getTaskType() + " task still pending. Not submitting new task. "
+ "Enqueued a request that will be executed when the current request completes.");
}
shardSyncRequestPending.compareAndSet(false /*expected*/, true /*update*/);
}
return submittedTaskFuture;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.KinesisClientLibIOException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;

import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class ShardSyncTaskManagerTest {
private static final long SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS = 100;
private static final InitialPositionInStreamExtended INITIAL_POSITION_IN_STREAM_EXTENDED = InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final boolean CLEANUP_LEASES_SHARD_COMPLETION = Boolean.TRUE;
private static final boolean IGNORE_UNEXPECTED_CHILD_SHARDS = Boolean.TRUE;
private static final long SHARD_SYNC_IDLE_TIME_MILLIS = 0;

@Mock private IKinesisProxy mockKinesisProxy;
@Mock private ILeaseManager<KinesisClientLease> mockLeaseManager;
@Mock private IMetricsFactory mockMetricsFactory;
@Mock private IMetricsScope mockMetricsScope;

private ShardSyncTaskManager shardSyncTaskManager;
private ShardSyncer pausableNoOpShardSyncer;
private ShardSyncer mockShardSyncer;
private CountDownLatch countDownLatch;

@Before public void setup() {
MockitoAnnotations.initMocks(this);
when(mockMetricsFactory.createMetrics()).thenReturn(mockMetricsScope);
countDownLatch = new CountDownLatch(1);
pausableNoOpShardSyncer = new PausableNoOpShardSyncer(countDownLatch);
mockShardSyncer = mock(ShardSyncer.class, delegatesTo(pausableNoOpShardSyncer));
shardSyncTaskManager = new ShardSyncTaskManager(mockKinesisProxy, mockLeaseManager,
INITIAL_POSITION_IN_STREAM_EXTENDED, CLEANUP_LEASES_SHARD_COMPLETION, IGNORE_UNEXPECTED_CHILD_SHARDS,
SHARD_SYNC_IDLE_TIME_MILLIS, mockMetricsFactory, Executors.newSingleThreadExecutor(), mockShardSyncer);
}

@Test public void testShardSyncIdempotency() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations.
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
// Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager.
int count = 0;
while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
}
// Since countDownLatch is still blocked, previous ShardSyncTask is still running, hence no new invocations.
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
}

@Test public void testShardSyncRerunsForPendingRequests() throws Exception {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations.
verify(mockShardSyncer, times(1))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
// Invoke a few more times. This would flip shardSyncRequestPending to true in ShardSyncTaskManager.
int count = 0;
while (count++ < 5) {
shardSyncTaskManager.syncShardAndLeaseInfo(new ArrayList<>());
}
countDownLatch.countDown(); // Will unblock pending shardSync and a new ShardSync should be triggered.
Thread.sleep(SHARDSYNCER_INVOCATION_WAIT_SLEEP_MILLIS); // small pause to wait for shardSyncer invocations.
// There should be 1 more shardSyncer invocation after the previous shardSync completes.
verify(mockShardSyncer, times(2))
.checkAndCreateLeasesForNewShards(Matchers.any(), Matchers.any(), Matchers.any(), anyBoolean(),
anyBoolean(), Matchers.any());
}

private static class PausableNoOpShardSyncer implements ShardSyncer {

private final CountDownLatch countDownLatch;

PausableNoOpShardSyncer(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
try {
countDownLatch.await();
} catch (InterruptedException e) {
// No-OP
}
}

@Override public void checkAndCreateLeasesForNewShards(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
this.checkAndCreateLeasesForNewShards(kinesisProxy, leaseManager, initialPositionInStream,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
}
}

}

0 comments on commit 2d6b92e

Please sign in to comment.