Skip to content

Commit

Permalink
IGNITE-8339 Fixed partition state detection on activation - Fixes #3885.
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
  • Loading branch information
Jokser authored and agoncharuk committed Apr 24, 2018
1 parent 65a4256 commit ec04cd1
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 61 deletions.
Expand Up @@ -2072,7 +2072,7 @@ public boolean reserveForFastLocalGet(int part, AffinityTopologyVersion topVer)
topology().partitionState(localNodeId(), part) == OWNING :
"result=" + result + ", persistenceEnabled=" + group().persistenceEnabled() +
", partitionState=" + topology().partitionState(localNodeId(), part) +
", replicated=" + isReplicated();
", replicated=" + isReplicated() + ", part=" + part;

return result;
}
Expand Down
Expand Up @@ -370,10 +370,7 @@ private boolean initPartitions(AffinityTopologyVersion affVer, List<List<Cluster
GridDhtLocalPartition locPart = getOrCreatePartition(p);

if (shouldOwn) {
boolean owned = locPart.own();

assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() +
", part=" + locPart + ']';
locPart.own();

if (log.isDebugEnabled())
log.debug("Owned partition for oldest node [grp=" + grp.cacheOrGroupName() +
Expand Down Expand Up @@ -466,8 +463,6 @@ else if (localNode(p, aff))
boolean affReady,
boolean updateMoving)
throws IgniteCheckedException {
ClusterNode loc = ctx.localNode();

ctx.database().checkpointReadLock();

try {
Expand Down Expand Up @@ -495,8 +490,6 @@ else if (localNode(p, aff))
lastTopChangeVer = readyTopVer = evts.topologyVersion();
}

ClusterNode oldest = discoCache.oldestAliveServerNode();

if (log.isDebugEnabled()) {
log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
Expand All @@ -506,46 +499,9 @@ else if (localNode(p, aff))

cntrMap.clear();

boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());

// If this is the oldest node.
if (oldest != null && (loc.equals(oldest) || grpStarted)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);

if (log.isDebugEnabled()) {
log.debug("Created brand new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
else if (!node2part.valid()) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);

if (log.isDebugEnabled()) {
log.debug("Created new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + node2part + ']');
}
}
else if (!node2part.nodeId().equals(loc.id())) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);
initializeFullMap(updateSeq);

if (log.isDebugEnabled()) {
log.debug("Copied old map into new map on oldest node (previous oldest node left) [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
}
boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());

if (evts.hasServerLeft()) {
List<DiscoveryEvent> evts0 = evts.events();
Expand Down Expand Up @@ -611,6 +567,61 @@ else if (!node2part.nodeId().equals(loc.id())) {
}
}

/**
* Initializes full map if current full map is empty or invalid in case of coordinator or cache groups start.
*
* @param updateSeq Update sequence to initialize full map.
*/
private void initializeFullMap(long updateSeq) {
if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture))
return;

GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture) topReadyFut;

boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());

ClusterNode oldest = discoCache.oldestAliveServerNode();

// If this is the oldest node.
if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);

if (log.isDebugEnabled()) {
log.debug("Created brand new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
else if (!node2part.valid()) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);

if (log.isDebugEnabled()) {
log.debug("Created new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + node2part + ']');
}
}
else if (!node2part.nodeId().equals(ctx.localNode().id())) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);

if (log.isDebugEnabled()) {
log.debug("Copied old map into new map on oldest node (previous oldest node left) [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
}
}

/**
* @param p Partition number.
* @param topVer Topology version.
Expand All @@ -625,11 +636,10 @@ private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
lock.writeLock().lock();

try {
if (node2part == null)
return;

long updateSeq = this.updateSeq.incrementAndGet();

initializeFullMap(updateSeq);

for (int p = 0; p < grp.affinity().partitions(); p++) {
GridDhtLocalPartition locPart = locParts.get(p);

Expand Down
Expand Up @@ -1127,6 +1127,12 @@ private void distributedExchange() throws IgniteCheckedException {
}
}

/* It is necessary to run database callback before all topology callbacks.
In case of persistent store is enabled we first restore partitions presented on disk.
We need to guarantee that there are no partition state changes logged to WAL before this callback
to make sure that we correctly restored last actual states. */
cctx.database().beforeExchange(this);

if (!exchCtx.mergeExchanges()) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
Expand All @@ -1138,10 +1144,6 @@ private void distributedExchange() throws IgniteCheckedException {
}
}

// It is necessary to run database callback after all topology callbacks, so partition states could be
// correctly restored from the persistent store.
cctx.database().beforeExchange(this);

changeWalModeIfNeeded();

if (crd.isLocal()) {
Expand Down
Expand Up @@ -20,18 +20,27 @@
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Assert;

/**
*
Expand Down Expand Up @@ -131,7 +140,7 @@ private void activateCachesRestore(int srvs, boolean withNewCaches) throws Excep

checkNoCaches(srvs);

srv.active(true);
srv.cluster().active(true);

final int CACHES = withNewCaches ? 4 : 2;

Expand Down Expand Up @@ -212,7 +221,7 @@ public void testClientJoinsWhenActivationIsInProgress() throws Exception {
}, "client-starter-thread");

clientStartLatch.countDown();
srv.active(true);
srv.cluster().active(true);

clStartFut.get();
}
Expand Down Expand Up @@ -242,7 +251,7 @@ public void testActivateCacheRestoreConfigurationConflict() throws Exception {

Ignite srv = startGrids(SRVS);

srv.active(true);
srv.cluster().active(true);

CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);

Expand All @@ -265,4 +274,102 @@ public void testActivateCacheRestoreConfigurationConflict() throws Exception {
assertTrue(X.getCause(e).getMessage().contains("Failed to start configured cache."));
}
}

/**
* Test that after deactivation during eviction and rebalance and activation again after
* all data in cache is consistent.
*
* @throws Exception If failed.
*/
public void testDeactivateDuringEvictionAndRebalance() throws Exception {
IgniteEx srv = (IgniteEx) startGrids(3);

srv.cluster().active(true);

CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME)
.setBackups(1)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setIndexedTypes(Integer.class, Integer.class)
.setAffinity(new RendezvousAffinityFunction(false, 64));

IgniteCache cache = srv.createCache(ccfg);

// High number of keys triggers long partition eviction.
final int keysCount = 100_000;

try (IgniteDataStreamer ds = srv.dataStreamer(DEFAULT_CACHE_NAME)) {
log.info("Writing initial data...");

ds.allowOverwrite(true);
for (int k = 1; k <= keysCount; k++) {
ds.addData(k, k);

if (k % 50_000 == 0)
log.info("Written " + k + " entities.");
}

log.info("Writing initial data finished.");
}

AtomicInteger keyCounter = new AtomicInteger(keysCount);
AtomicBoolean stop = new AtomicBoolean(false);

Set<Integer> addedKeys = new GridConcurrentHashSet<>();

IgniteInternalFuture cacheLoadFuture = GridTestUtils.runMultiThreadedAsync(() -> {
while (!stop.get()) {
int key = keyCounter.incrementAndGet();
try {
cache.put(key, key);

addedKeys.add(key);

Thread.sleep(10);
}
catch (Exception ignored) { }
}
}, 2, "cache-load");

stopGrid(2);

// Wait for some data.
Thread.sleep(3000);

startGrid(2);

log.info("Stop load...");

stop.set(true);

cacheLoadFuture.get();

// Deactivate and activate again.
srv.cluster().active(false);

srv.cluster().active(true);

awaitPartitionMapExchange();

log.info("Checking data...");

for (Ignite ignite : G.allGrids()) {
IgniteCache cache1 = ignite.getOrCreateCache(DEFAULT_CACHE_NAME);

for (int k = 1; k <= keysCount; k++) {
Object val = cache1.get(k);

Assert.assertNotNull("node=" + ignite.name() + ", key=" + k, val);

Assert.assertTrue("node=" + ignite.name() + ", key=" + k + ", val=" + val, (int) val == k);
}

for (int k : addedKeys) {
Object val = cache1.get(k);

Assert.assertNotNull("node=" + ignite.name() + ", key=" + k, val);

Assert.assertTrue("node=" + ignite.name() + ", key=" + k + ", val=" + val, (int) val == k);
}
}
}
}
Expand Up @@ -186,7 +186,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {

DataStorageConfiguration cfg1 = new DataStorageConfiguration();

cfg1.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(150 * 1024 * 1024L));
cfg1.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setMaxSize(512 * 1024 * 1024L));

cfg.setDataStorageConfiguration(cfg1);

Expand Down
Expand Up @@ -584,7 +584,7 @@ public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception
try (IgniteDataStreamer ds = ig.dataStreamer(cacheName)) {
ds.allowOverwrite(true);

for (int k0 = k; k < k0 + 50_000; k++)
for (int k0 = k; k < k0 + 10_000; k++)
ds.addData(k, k);
}

Expand Down Expand Up @@ -631,7 +631,7 @@ public void testPartitionCounterConsistencyOnUnstableTopology() throws Exception
for (;k < k0 + 3; k++)
ds.addData(k, k);

U.sleep(1);
U.sleep(10);
}
}
catch (Exception e) {
Expand Down

0 comments on commit ec04cd1

Please sign in to comment.