Skip to content

Commit

Permalink
IGNITE-802: reworked GridCachePartitionedQueueEntryMoveSelfTest.testQ…
Browse files Browse the repository at this point in the history
…ueue
  • Loading branch information
Denis Magda committed Sep 10, 2015
1 parent d96e0d2 commit ec5c795
Showing 1 changed file with 66 additions and 125 deletions.
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite; import org.apache.ignite.Ignite;
Expand All @@ -30,18 +29,15 @@
import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteCollectionAbstractTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.GridTestUtils;


import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
Expand All @@ -52,11 +48,6 @@
* Cache queue test with changing topology. * Cache queue test with changing topology.
*/ */
public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollectionAbstractTest { public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollectionAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-802");
}

/** Queue capacity. */ /** Queue capacity. */
private static final int QUEUE_CAP = 5; private static final int QUEUE_CAP = 5;


Expand All @@ -66,9 +57,6 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
/** Backups count. */ /** Backups count. */
private static final int BACKUP_CNT = 1; private static final int BACKUP_CNT = 1;


/** Node ID to set manually on node startup. */
private UUID nodeId;

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected int gridCount() { @Override protected int gridCount() {
return GRID_CNT; return GRID_CNT;
Expand Down Expand Up @@ -98,116 +86,93 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection
return colCfg; return colCfg;
} }


/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);

if (nodeId != null) {
cfg.setNodeId(nodeId);

nodeId = null;
}

return cfg;
}

/** /**
* @throws Exception If failed. * @throws Exception If failed.
*/ */
public void testQueue() throws Exception { public void testQueue() throws Exception {
try { final String queueName = "queue-test-name";
startGrids(GRID_CNT);

final String queueName = "queue-name-" + UUID.randomUUID();


System.out.println(U.filler(20, '\n')); System.out.println(U.filler(20, '\n'));


final CountDownLatch latch1 = new CountDownLatch(1); final CountDownLatch latch1 = new CountDownLatch(1);
//final CountDownLatch latch2 = new CountDownLatch(1); final CountDownLatch latch2 = new CountDownLatch(1);


IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() { @Override public Void call() throws IgniteInterruptedCheckedException {
Ignite ignite = grid(0); Ignite ignite = grid(0);


IgniteQueue<Integer> queue = ignite.queue(queueName, IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));
QUEUE_CAP,
config(true));


for (int i = 0; i < QUEUE_CAP * 2; i++) { for (int i = 0; i < QUEUE_CAP * 2; i++) {
if (i == QUEUE_CAP) { if (i == QUEUE_CAP) {
latch1.countDown(); latch1.countDown();


//U.await(latch2); U.await(latch2);
} }

try {
info(">>> Putting value: " + i);


queue.put(i); try {
info(">>> Putting value: " + i);


info(">>> Value is in queue: " + i); queue.put(i);
}
catch (Error | RuntimeException e) {
error("Failed to put value: " + i, e);


throw e; info(">>> Value is in queue: " + i);
}
} }
catch (Error | RuntimeException e) {
error("Failed to put value: " + i, e);


return null; throw e;
}
} }
});


latch1.await(); return null;
}
});


startAdditionalNodes(BACKUP_CNT + 2, queueName); latch1.await();


System.out.println(U.filler(20, '\n')); startAdditionalNodes(BACKUP_CNT + 2, queueName);


//latch2.countDown(); System.out.println(U.filler(20, '\n'));


IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() { latch2.countDown();
@Override public Void call() throws IgniteCheckedException {
Ignite ignite = grid(GRID_CNT);


IgniteQueue<Integer> queue = ignite.queue(queueName, Integer.MAX_VALUE, config(true)); IgniteInternalFuture<?> fut2 = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws IgniteCheckedException {
Ignite ignite = grid(GRID_CNT);


int cnt = 0; IgniteQueue<Integer> queue = ignite.queue(queueName, QUEUE_CAP, config(true));


do { int cnt = 0;
try {
Integer i = queue.poll();


if (i != null) { do {
info(">>> Polled value: " + cnt); try {
Integer i = queue.poll();


cnt++; if (i != null) {
} info(">>> Polled value: " + cnt);
else {
info(">>> Waiting for value...");


U.sleep(2000); cnt++;
}
} }
catch (Error | RuntimeException e) { else {
error("Failed to poll value.", e); info(">>> Waiting for value...");


throw e; U.sleep(2000);
} }
} }
while (cnt < QUEUE_CAP * 2); catch (Error | RuntimeException e) {
error("Failed to poll value.", e);


return null; throw e;
}
} }
}); while (cnt < QUEUE_CAP * 2);


fut1.get(); return null;
fut2.get(); }
} });
finally {
stopAllGrids(); fut1.get();
} fut2.get();
} }


/** /**
Expand All @@ -218,51 +183,27 @@ public void testQueue() throws Exception {
* @throws Exception If failed. * @throws Exception If failed.
*/ */
private void startAdditionalNodes(int cnt, String queueName) throws Exception { private void startAdditionalNodes(int cnt, String queueName) throws Exception {
AffinityFunction aff = jcache(0).getConfiguration(CacheConfiguration.class).getAffinity(); IgniteQueue queue = ignite(0).queue(queueName, 0, null);
AffinityKeyMapper mapper = jcache(0).getConfiguration(CacheConfiguration.class).getAffinityMapper();

assertNotNull(aff);
assertNotNull(mapper);

int part = aff.partition(mapper.affinityKey(queueName));


Collection<ClusterNode> nodes = grid(0).cluster().nodes(); CacheConfiguration cCfg = getQueueCache(queue);


Collection<ClusterNode> aff0 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName); Collection<ClusterNode> aff1 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);
Collection<ClusterNode> aff1 = nodes(aff, part, nodes);


assertEquals(new ArrayList<>(aff0), new ArrayList<>(aff1)); for (int i = 0, id = GRID_CNT; i < cnt; i++) {
startGrid(id++);


Collection<ClusterNode> aff2; awaitPartitionMapExchange();
Collection<ClusterNode> tmpNodes;


int retries = 10000; Collection<ClusterNode> aff2 = ignite(0).affinity(cCfg.getName()).mapKeyToPrimaryAndBackups(queueName);


do { if (!aff1.iterator().next().equals(aff2.iterator().next())) {
tmpNodes = new ArrayList<>(cnt); info("Moved queue to new primary node [oldAff=" + aff1 + ", newAff=" + aff2 + ']');


for (int i = 0; i < cnt; i++) return;
tmpNodes.add(new GridTestNode(UUID.randomUUID())); }

aff2 = nodes(aff, part, F.concat(true, tmpNodes, nodes));

if (retries-- < 0)
throw new IgniteCheckedException("Failed to find node IDs to change current affinity mapping.");
} }
while (F.containsAny(aff1, aff2));

int i = GRID_CNT;

// Start several additional grids.
for (UUID id : F.nodeIds(tmpNodes)) {
nodeId = id;

startGrid(i++);
}

aff2 = ignite(0).affinity(null).mapKeyToPrimaryAndBackups(queueName);


assertFalse("Unexpected affinity [aff1=" + aff1 + ", aff2=" + aff2 + ']', F.containsAny(aff1, aff2)); throw new IgniteCheckedException("Unable to move the queue to a new primary node");
} }


/** /**
Expand Down

0 comments on commit ec5c795

Please sign in to comment.