Skip to content

Commit

Permalink
ignite-1.5 Corrected fix for hang on metadata update. Fix for ignite-647
Browse files Browse the repository at this point in the history
 (issues with dynamic cache start when fair affinity is used).
  • Loading branch information
sboikov committed Dec 24, 2015
1 parent fe14099 commit 383f317
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 135 deletions.
Expand Up @@ -1961,7 +1961,8 @@ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
if (req.initiatingNodeId() == null)
desc.staticallyConfigured(true);

desc.receivedOnDiscovery(true);
if (joiningNodeId.equals(ctx.localNodeId()))
desc.receivedOnDiscovery(true);

DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);

Expand Down
Expand Up @@ -491,6 +491,9 @@ public GridBinaryMarshaller marshaller() {

AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);

if (topVer == null)
topVer = ctx.cache().context().exchange().readyAffinityVersion();

BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));

if (err != null)
Expand Down
Expand Up @@ -1290,59 +1290,48 @@ public void updateAllAsyncInternal0(

GridCacheReturn retVal = null;

IgniteTxManager tm = ctx.tm();
if (keys.size() > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
!ctx.dr().receiveEnabled() // and no DR.
) {
// This method can only be used when there are no replicated entries in the batch.
UpdateBatchResult updRes = updateWithBatch(node,
hasNear,
req,
res,
locked,
ver,
dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
sndPrevVal);

// Needed for metadata cache transaction.
boolean set = tm.setTxTopologyHint(req.topologyVersion());
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();

try {
if (keys.size() > 1 && // Several keys ...
writeThrough() && !req.skipStore() && // and store is enabled ...
!ctx.store().isLocal() && // and this is not local store ...
!ctx.dr().receiveEnabled() // and no DR.
) {
// This method can only be used when there are no replicated entries in the batch.
UpdateBatchResult updRes = updateWithBatch(node,
hasNear,
req,
res,
locked,
ver,
dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
sndPrevVal);

deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();

if (req.operation() == TRANSFORM)
retVal = updRes.invokeResults();
}
else {
UpdateSingleResult updRes = updateSingle(node,
hasNear,
req,
res,
locked,
ver,
dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
sndPrevVal);

retVal = updRes.returnValue();
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
}
if (req.operation() == TRANSFORM)
retVal = updRes.invokeResults();
}
finally {
if (set)
tm.setTxTopologyHint(null);
else {
UpdateSingleResult updRes = updateSingle(node,
hasNear,
req,
res,
locked,
ver,
dhtFut,
completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
sndPrevVal);

retVal = updRes.returnValue();
deleted = updRes.deleted();
dhtFut = updRes.dhtFuture();
}

if (retVal == null)
Expand Down
Expand Up @@ -329,6 +329,19 @@ public boolean dummyReassign() {
* @return {@code True} if cache was added during this exchange.
*/
public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
if (cacheStarted(cacheId))
return true;

GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);

return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
}

/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
private boolean cacheStarted(int cacheId) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
Expand All @@ -338,9 +351,7 @@ public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
}
}

GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);

return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
return false;
}

/**
Expand Down Expand Up @@ -419,7 +430,8 @@ private boolean canCalculateAffinity(GridCacheContext cacheCtx) {
// If local node did not initiate exchange or local node is the only cache node in grid.
Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx, exchId.topologyVersion());

return !exchId.nodeId().equals(cctx.localNodeId()) ||
return cacheStarted(cacheCtx.cacheId()) ||
!exchId.nodeId().equals(cctx.localNodeId()) ||
(affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
}

Expand Down
Expand Up @@ -52,10 +52,16 @@ public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx,
// Obtain the topology version to use.
long threadId = Thread.currentThread().getId();

AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
AffinityTopologyVersion topVer = null;

if (tx.system())
topVer = tx.topologyVersionSnapshot();

if (topVer == null)
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);

// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx != null && tx.system())
if (topVer == null && tx.system())
topVer = cctx.tm().lockedTopologyVersion(threadId, tx);

if (topVer != null) {
Expand Down
Expand Up @@ -92,17 +92,10 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest {
return new NearCacheConfiguration();
}

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed.");
}

/**
* @throws Exception if failed.
*/
public void testAffinity() throws Exception {
fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed.");

checkAffinity();

stopGrid(gridCount() - 1);
Expand Down
Expand Up @@ -37,22 +37,11 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

/** */
public FairAffinityDynamicCacheSelfTest(){
super(false);
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);

TcpDiscoverySpi disco = new TcpDiscoverySpi();

disco.setIpFinder(IP_FINDER);

cfg.getTransactionConfiguration().setTxSerializableEnabled(true);

cfg.setDiscoverySpi(disco);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);

return cfg;
}
Expand All @@ -71,8 +60,6 @@ public FairAffinityDynamicCacheSelfTest(){
* @throws Exception If failed.
*/
public void testStartStopCache() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-647");

CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();

cacheCfg.setCacheMode(CacheMode.PARTITIONED);
Expand All @@ -94,6 +81,6 @@ public void testStartStopCache() throws Exception {
}
});

destFut.get(2000L);
destFut.get(5000L);
}
}
Expand Up @@ -126,8 +126,6 @@ public void testCrossCacheTxOperationsPrimarySync() throws Exception {
* @throws Exception If failed.
*/
public void testCrossCacheTxOperationsFairAffinity() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-647");

txOperations(PARTITIONED, FULL_SYNC, true, true);
}

Expand Down

0 comments on commit 383f317

Please sign in to comment.