Skip to content

Commit

Permalink
waitForRebalancing minor fix and relocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov committed Mar 7, 2017
1 parent f794715 commit e960eb7
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 65 deletions.
Expand Up @@ -237,8 +237,8 @@ public void testSimpleRebalancing() throws Exception {


int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 1 : 0; int waitMinorVer = ignite.configuration().isLateAffinityAssignment() ? 1 : 0;


waitForRebalancing(0, new AffinityTopologyVersion(2, waitMinorVer)); waitForRebalancing(0, 2, waitMinorVer);
waitForRebalancing(1, new AffinityTopologyVersion(2, waitMinorVer)); waitForRebalancing(1, 2, waitMinorVer);


awaitPartitionMapExchange(true, true, null); awaitPartitionMapExchange(true, true, null);


Expand All @@ -258,8 +258,8 @@ public void testSimpleRebalancing() throws Exception {


startGrid(2); startGrid(2);


waitForRebalancing(1, new AffinityTopologyVersion(4, waitMinorVer)); waitForRebalancing(1, 4, waitMinorVer);
waitForRebalancing(2, new AffinityTopologyVersion(4, waitMinorVer)); waitForRebalancing(2, 4, waitMinorVer);


awaitPartitionMapExchange(true, true, null); awaitPartitionMapExchange(true, true, null);


Expand Down Expand Up @@ -351,67 +351,6 @@ public void testLoadRebalancing() throws Exception {
info("Time to rebalance entries: " + spend); info("Time to rebalance entries: " + spend);
} }


/**
* @param id Node id.
* @param major Major ver.
* @param minor Minor ver.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
waitForRebalancing(id, new AffinityTopologyVersion(major, minor));
}

/**
* @param id Node id.
* @param major Major ver.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
waitForRebalancing(id, new AffinityTopologyVersion(major));
}

/**
* @param id Node id.
* @param top Topology version.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, AffinityTopologyVersion top) throws IgniteCheckedException {
boolean finished = false;

long stopTime = System.currentTimeMillis() + 60_000;

while (!finished && (System.currentTimeMillis() < stopTime)) {
finished = true;

for (GridCacheAdapter c : grid(id).context().cache().internalCaches()) {
GridDhtPartitionDemander.RebalanceFuture fut = (GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();
if (fut.topologyVersion() == null || fut.topologyVersion().compareTo(top) < 0) {
finished = false;

log.info("Unexpected future version, will retry [futVer=" + fut.topologyVersion() +
", expVer=" + top + ']');

U.sleep(1000);

break;
}
else {
finished = fut.get();

if (!finished) {
log.warning("Rebalancing finished with missed partitions: " + fut.topologyVersion());

U.sleep(100);
}
else
break;
}
}
}

assertTrue(finished);
}

/** /**
* @throws Exception If failed. * @throws Exception If failed.
*/ */
Expand Down
Expand Up @@ -58,6 +58,7 @@
import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event; import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
Expand All @@ -71,6 +72,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
Expand Down Expand Up @@ -609,6 +611,81 @@ protected void awaitPartitionMapExchange(boolean waitEvicts,
} }
} }


/**
* @param id Node id.
* @param major Major ver.
* @param minor Minor ver.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major, int minor) throws IgniteCheckedException {
waitForRebalancing(grid(id), new AffinityTopologyVersion(major, minor));
}

/**
* @param id Node id.
* @param major Major ver.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(int id, int major) throws IgniteCheckedException {
waitForRebalancing(grid(id), new AffinityTopologyVersion(major));
}

/**
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing() throws IgniteCheckedException {
for (Ignite ignite : G.allGrids())
waitForRebalancing((IgniteEx)ignite, null);
}

/**
* @param ignite Node.
* @param top Topology version.
* @throws IgniteCheckedException If failed.
*/
protected void waitForRebalancing(IgniteEx ignite, AffinityTopologyVersion top) throws IgniteCheckedException {
if (ignite.configuration().isClientMode())
return;

boolean finished = false;

long stopTime = System.currentTimeMillis() + 60_000;

while (!finished && (System.currentTimeMillis() < stopTime)) {
finished = true;

if (top == null)
top = ignite.context().discovery().topologyVersionEx();

for (GridCacheAdapter c : ignite.context().cache().internalCaches()) {
GridDhtPartitionDemander.RebalanceFuture fut =
(GridDhtPartitionDemander.RebalanceFuture)c.preloader().rebalanceFuture();

if (fut.topologyVersion() == null || fut.topologyVersion().compareTo(top) < 0) {
finished = false;

log.info("Unexpected future version, will retry [futVer=" + fut.topologyVersion() +
", expVer=" + top + ']');

U.sleep(100);

break;
}
else if (!fut.get()) {
finished = false;

log.warning("Rebalancing finished with missed partitions.");

U.sleep(100);

break;
}
}
}

assertTrue(finished);
}

/** /**
* @param ignite Node. * @param ignite Node.
*/ */
Expand Down

0 comments on commit e960eb7

Please sign in to comment.