Skip to content

Commit

Permalink
ignite-3116 Cancel force keys futures on node stop (cherry picked fro…
Browse files Browse the repository at this point in the history
…m commit 0428018)
  • Loading branch information
sboikov committed May 16, 2016
1 parent ea909bc commit db8a9b2
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 58 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand All @@ -44,6 +45,7 @@
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
Expand Down Expand Up @@ -167,9 +169,44 @@ public GridDhtGetFuture(
* Initializes future.
*/
void init() {
map(keys);
GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);

if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
if (retries == null)
retries = new HashSet<>();

retries.addAll(fut.invalidPartitions());
}

fut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
fut.get();
}
catch (NodeStoppingException e) {
return;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');

onDone(e);

return;
}

map0(keys);

markInitialized();
}
});
}
else {
map0(keys);

markInitialized();
markInitialized();
}
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -204,42 +241,6 @@ public GridCacheVersion version() {
return false;
}

/**
* @param keys Keys.
*/
private void map(final Map<KeyCacheObject, Boolean> keys) {
GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);

if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
if (retries == null)
retries = new HashSet<>();

retries.addAll(fut.invalidPartitions());
}

add(new GridEmbeddedFuture<>(
new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
@Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
if (e != null) { // Check error first.
if (log.isDebugEnabled())
log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');

onDone(e);
}
else
map0(keys);

// Finish this one.
return Collections.emptyList();
}
},
fut));
}
else
map0(keys);
}

/**
* @param keys Keys to map.
*/
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand Down Expand Up @@ -220,6 +221,9 @@ private void map() {
log.debug("Failed to request keys from preloader " +
"[keys=" + key + ", err=" + e + ']');

if (e instanceof NodeStoppingException)
return;

onDone(e);
}
else
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
Expand Down Expand Up @@ -378,17 +379,69 @@ protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockR
IgniteInternalFuture<Object> keyFut = F.isEmpty(req.keys()) ? null :
ctx.dht().dhtPreloader().request(req.keys(), req.topologyVersion());

if (keyFut == null || keyFut.isDone())
if (keyFut == null || keyFut.isDone()) {
if (keyFut != null) {
try {
keyFut.get();
}
catch (NodeStoppingException e) {
return;
}
catch (IgniteCheckedException e) {
onForceKeysError(nodeId, req, e);

return;
}
}

processDhtLockRequest0(nodeId, req);
}
else {
keyFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> t) {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
fut.get();
}
catch (NodeStoppingException e) {
return;
}
catch (IgniteCheckedException e) {
onForceKeysError(nodeId, req, e);

return;
}

processDhtLockRequest0(nodeId, req);
}
});
}
}

/**
* @param nodeId Node ID.
* @param req Request.
* @param e Error.
*/
private void onForceKeysError(UUID nodeId, GridDhtLockRequest req, IgniteCheckedException e) {
GridDhtLockResponse res = new GridDhtLockResponse(ctx.cacheId(),
req.version(),
req.futureId(),
req.miniId(),
e,
ctx.deploymentEnabled());

try {
ctx.io().send(nodeId, res, ctx.ioPolicy());
}
catch (ClusterTopologyCheckedException e0) {
if (log.isDebugEnabled())
log.debug("Failed to send lock reply to remote node because it left grid: " + nodeId);
}
catch (IgniteCheckedException e0) {
U.error(log, "Failed to send lock reply to node: " + nodeId, e);
}
}

/**
* @param nodeId Node ID.
* @param req Request.
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
Expand Down Expand Up @@ -1466,17 +1467,64 @@ public void updateAllAsyncInternal(
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());

if (forceFut == null || forceFut.isDone())
if (forceFut == null || forceFut.isDone()) {
try {
if (forceFut != null)
forceFut.get();
}
catch (NodeStoppingException e) {
return;
}
catch (IgniteCheckedException e) {
onForceKeysError(nodeId, req, completionCb, e);

return;
}

updateAllAsyncInternal0(nodeId, req, completionCb);
}
else {
forceFut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> t) {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
fut.get();
}
catch (NodeStoppingException e) {
return;
}
catch (IgniteCheckedException e) {
onForceKeysError(nodeId, req, completionCb, e);

return;
}

updateAllAsyncInternal0(nodeId, req, completionCb);
}
});
}
}

/**
* @param nodeId Node ID.
* @param req Update request.
* @param completionCb Completion callback.
* @param e Error.
*/
private void onForceKeysError(final UUID nodeId,
final GridNearAtomicUpdateRequest req,
final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
IgniteCheckedException e
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
req.futureVersion(),
ctx.deploymentEnabled());

res.addFailedKeys(req.keys(), e);

completionCb.apply(req, res);
}

/**
* Executes local update after preloader fetched values.
*
Expand Down
Expand Up @@ -246,7 +246,11 @@ private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc)

int curTopVer = topCntr.get();

preloader.addFuture(this);
if (!preloader.addFuture(this)) {
assert isDone() : this;

return false;
}

trackable = true;

Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -115,6 +116,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** */
private final AtomicInteger partsEvictOwning = new AtomicInteger();

/** */
private volatile boolean stopping;

/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
Expand Down Expand Up @@ -210,6 +214,8 @@ public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
if (log.isDebugEnabled())
log.debug("DHT rebalancer onKernalStop callback.");

stopping = true;

cctx.events().removeListener(discoLsnr);

// Acquire write busy lock.
Expand All @@ -221,8 +227,19 @@ public GridDhtPreloader(GridCacheContext<?, ?> cctx) {
if (demander != null)
demander.stop();

IgniteCheckedException err = stopError();

for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
fut.onDone(err);

top = null;
}
/**
* @return Node stop exception.
*/
private IgniteCheckedException stopError() {
return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
}

/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
Expand Down Expand Up @@ -711,9 +728,18 @@ public void onPartitionEvicted(GridDhtLocalPartition part, boolean updateSeq) {
* Adds future to future map.
*
* @param fut Future to add.
* @return {@code False} if node cache is stopping and future was completed with error.
*/
void addFuture(GridDhtForceKeysFuture<?, ?> fut) {
boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
forceKeyFuts.put(fut.futureId(), fut);

if (stopping) {
fut.onDone(stopError());

return false;
}

return true;
}

/**
Expand Down

0 comments on commit db8a9b2

Please sign in to comment.