Skip to content

Commit

Permalink
GG-11937 - Fixed checkpoint read lock release on exception
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Feb 8, 2017
1 parent 9a8746e commit 7122256
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 37 deletions.
Expand Up @@ -116,6 +116,7 @@
import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;
Expand Down Expand Up @@ -904,7 +905,7 @@ private void addRemovedItemsCleanupTask(long timeout) {
private void checkConsistency() throws IgniteCheckedException { private void checkConsistency() throws IgniteCheckedException {
if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
for (ClusterNode n : ctx.discovery().remoteNodes()) { for (ClusterNode n : ctx.discovery().remoteNodes()) {
if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) if (Boolean.TRUE.equals(n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)))
continue; continue;


checkTransactionConfiguration(n); checkTransactionConfiguration(n);
Expand Down Expand Up @@ -1264,16 +1265,6 @@ private void stopCache(GridCacheAdapter<?, ?> cache, boolean cancel, boolean des
if (log.isInfoEnabled()) if (log.isInfoEnabled())
log.info("Stopped cache: " + cache.name()); log.info("Stopped cache: " + cache.name());


if (sharedCtx.pageStore() != null) {
try {
sharedCtx.pageStore().shutdownForCache(ctx, destroy);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + ctx.name() + "]", e);
}
}

cleanup(ctx); cleanup(ctx);
} }


Expand Down Expand Up @@ -1909,8 +1900,9 @@ private void stopGateway(DynamicCacheChangeRequest req) {


/** /**
* @param req Stop request. * @param req Stop request.
* @return Stopped cache context.
*/ */
private void prepareCacheStop(DynamicCacheChangeRequest req) { private GridCacheContext<?, ?> prepareCacheStop(DynamicCacheChangeRequest req) {
assert req.stop() || req.close() : req; assert req.stop() || req.close() : req;


GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName())); GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
Expand All @@ -1926,7 +1918,11 @@ private void prepareCacheStop(DynamicCacheChangeRequest req) {
onKernalStop(cache, req.destroy()); onKernalStop(cache, req.destroy());


stopCache(cache, true, req.destroy()); stopCache(cache, true, req.destroy());

return ctx;
} }

return null;
} }


/** /**
Expand Down Expand Up @@ -1956,16 +1952,22 @@ public void onExchangeDone(
} }


if (!F.isEmpty(reqs) && err == null) { if (!F.isEmpty(reqs) && err == null) {
Collection<IgniteBiTuple<GridCacheContext, Boolean>> stopped = null;

for (DynamicCacheChangeRequest req : reqs) { for (DynamicCacheChangeRequest req : reqs) {
String masked = maskNull(req.cacheName()); String masked = maskNull(req.cacheName());


GridCacheContext<?, ?> stopCtx = null;
boolean destroy = false;

if (req.stop()) { if (req.stop()) {
stopGateway(req); stopGateway(req);


sharedCtx.database().checkpointReadLock(); sharedCtx.database().checkpointReadLock();


try { try {
prepareCacheStop(req); stopCtx = prepareCacheStop(req);
destroy = req.destroy();
} }
finally { finally {
sharedCtx.database().checkpointReadUnlock(); sharedCtx.database().checkpointReadUnlock();
Expand All @@ -1987,11 +1989,22 @@ else if (req.close() && req.initiatingNodeId().equals(ctx.localNodeId())) {


proxy.context().gate().onStopped(); proxy.context().gate().onStopped();


prepareCacheStop(req); stopCtx = prepareCacheStop(req);
destroy = req.destroy();
} }
} }
} }

if (stopCtx != null) {
if (stopped == null)
stopped = new ArrayList<>();

stopped.add(F.<GridCacheContext, Boolean>t(stopCtx, destroy));
}
} }

if (stopped != null)
sharedCtx.database().onCachesStopped(stopped);
} }
} }


Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


/** /**
Expand Down Expand Up @@ -192,9 +193,9 @@ public void beforeExchange(GridDhtPartitionsExchangeFuture discoEvt) throws Igni
} }


/** /**
* @throws IgniteCheckedException If failed. * @param stoppedCtxs A collection of tuples (cache context, destroy flag).
*/ */
public void beforeCachesStop() throws IgniteCheckedException { public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) {
// No-op. // No-op.
} }


Expand Down
Expand Up @@ -122,7 +122,7 @@
/** /**
* Non-transactional partitioned cache. * Non-transactional partitioned cache.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings({"unchecked", "TooBroadScope"})
@GridToStringExclude @GridToStringExclude
public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** */ /** */
Expand Down Expand Up @@ -1806,6 +1806,8 @@ private void updateAllAsyncInternal0(


IgniteCacheExpiryPolicy expiry = null; IgniteCacheExpiryPolicy expiry = null;


ctx.shared().database().checkpointReadLock();

try { try {
// If batch store update is enabled, we need to lock all entries. // If batch store update is enabled, we need to lock all entries.
// First, need to acquire locks on cache entries, then check filter. // First, need to acquire locks on cache entries, then check filter.
Expand Down Expand Up @@ -1978,6 +1980,9 @@ private void updateAllAsyncInternal0(


return; return;
} }
finally {
ctx.shared().database().checkpointReadUnlock();
}


if (remap) { if (remap) {
assert dhtFut == null; assert dhtFut == null;
Expand Down Expand Up @@ -2923,8 +2928,6 @@ else if (readers.contains(node.id())) // Reader became primary or backup.
@SuppressWarnings("ForLoopReplaceableByForEach") @SuppressWarnings("ForLoopReplaceableByForEach")
private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
throws GridDhtInvalidPartitionException { throws GridDhtInvalidPartitionException {
ctx.shared().database().checkpointReadLock();

if (req.size() == 1) { if (req.size() == 1) {
KeyCacheObject key = req.key(0); KeyCacheObject key = req.key(0);


Expand Down Expand Up @@ -3040,8 +3043,6 @@ private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopolog
entry.onUnlock(); entry.onUnlock();
} }


ctx.shared().database().checkpointReadUnlock();

if (skip != null && skip.size() == locked.size()) if (skip != null && skip.size() == locked.size())
// Optimization. // Optimization.
return; return;
Expand Down
Expand Up @@ -535,21 +535,6 @@ else if (msg instanceof StartFullSnapshotAckDiscoveryMessage)


updateTopologies(crdNode); updateTopologies(crdNode);


if (!F.isEmpty(reqs)) {
boolean hasStop = false;

for (DynamicCacheChangeRequest req : reqs) {
if (req.stop()) {
hasStop = true;

break;
}
}

if (hasStop)
cctx.cache().context().database().beforeCachesStop();
}

switch (exchange) { switch (exchange) {
case ALL: { case ALL: {
distributedExchange(); distributedExchange();
Expand Down
Expand Up @@ -354,7 +354,8 @@ CU.UTILITY_CACHE_NAME, new ServiceEntriesListener(), null, null


cancelFutures(depFuts, err); cancelFutures(depFuts, err);
cancelFutures(undepFuts, err); cancelFutures(undepFuts, err);
}finally { }
finally {
busyLock.unblock(); busyLock.unblock();
} }


Expand Down

0 comments on commit 7122256

Please sign in to comment.