Skip to content

Commit

Permalink
Iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
jasontedor committed Sep 13, 2017
1 parent 16c6fa3 commit e69b8d9
Showing 1 changed file with 19 additions and 55 deletions.
Expand Up @@ -63,11 +63,7 @@
public class GlobalCheckpointTrackerTests extends ESTestCase {

public void testEmptyShards() {
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
AllocationId.newInitializing().getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(AllocationId.newInitializing());
assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
}

Expand Down Expand Up @@ -108,11 +104,7 @@ public void testGlobalCheckpointUpdate() {
// it is however nice not to assume this on this level and check we do the right thing.
final long minLocalCheckpoint = allocations.values().stream().min(Long::compare).orElse(UNASSIGNED_SEQ_NO);

final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
active.iterator().next().getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active.iterator().next());
assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));

logger.info("--> using allocations");
Expand Down Expand Up @@ -176,11 +168,7 @@ public void testMissingActiveIdsPreventAdvance() {
assigned.putAll(active);
assigned.putAll(initializing);
AllocationId primary = active.keySet().iterator().next();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
primary.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(primary);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
Expand All @@ -205,11 +193,7 @@ public void testMissingInSyncIdsPreventAdvance() {
logger.info("active: {}, initializing: {}", active, initializing);

AllocationId primary = active.keySet().iterator().next();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
primary.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(primary);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(randomIntBetween(1, initializing.size() - 1),
Expand All @@ -228,11 +212,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
final Map<AllocationId, Long> active = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<AllocationId, Long> initializing = randomAllocationsWithLocalCheckpoints(1, 5);
final Map<AllocationId, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
active.keySet().iterator().next().getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active.keySet().iterator().next());
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet()), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
Expand Down Expand Up @@ -263,11 +243,7 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() {
if (randomBoolean()) {
allocations.putAll(initializingToBeRemoved);
}
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
active.iterator().next().getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active.iterator().next());
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
if (randomBoolean()) {
Expand Down Expand Up @@ -303,11 +279,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
final AtomicBoolean complete = new AtomicBoolean();
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
inSyncAllocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId)), emptySet());
tracker.activatePrimaryMode(globalCheckpoint);
Expand Down Expand Up @@ -348,18 +320,22 @@ public void testWaitForAllocationIdToBeInSync() throws Exception {
thread.join();
}

private GlobalCheckpointTracker newTracker(final AllocationId allocationId) {
return new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
}

public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
final int localCheckpoint = randomIntBetween(1, 32);
final int globalCheckpoint = randomIntBetween(localCheckpoint + 1, 64);
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean interrupted = new AtomicBoolean();
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
inSyncAllocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId)), emptySet());
tracker.activatePrimaryMode(globalCheckpoint);
Expand Down Expand Up @@ -408,11 +384,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception {
final Set<AllocationId> initializingIds = activeAndInitializingAllocationIds.v2();
IndexShardRoutingTable routingTable = routingTable(initializingIds);
AllocationId primaryId = activeAllocationIds.iterator().next();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
primaryId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
Expand Down Expand Up @@ -561,11 +533,7 @@ public void testRaceUpdatingGlobalCheckpoint() throws InterruptedException, Brok
final CyclicBarrier barrier = new CyclicBarrier(4);

final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1);
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
active.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing)), emptySet());
tracker.activatePrimaryMode(activeLocalCheckpoint);
Expand Down Expand Up @@ -726,11 +694,7 @@ public void testPrimaryContextHandoff() throws IOException {
public void testIllegalStateExceptionIfUnknownAllocationId() {
final AllocationId active = AllocationId.newInitializing();
final AllocationId initializing = AllocationId.newInitializing();
final GlobalCheckpointTracker tracker = new GlobalCheckpointTracker(
new ShardId("test", "_na_", 0),
active.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
final GlobalCheckpointTracker tracker = newTracker(active);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing)), emptySet());
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
Expand Down

0 comments on commit e69b8d9

Please sign in to comment.