Skip to content

Commit

Permalink
GG-23153 Cross-cache tx is mapped on wrong primary when enlisted cach…
Browse files Browse the repository at this point in the history
…es have incompatible assignments.
  • Loading branch information
ascherbakoff committed Aug 27, 2019
1 parent 61c7192 commit fe8c746
Show file tree
Hide file tree
Showing 17 changed files with 651 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ else if (desc.receivedFromStartVersion() != null)
else {
AffinityTopologyVersion locAffVer = cctx.exchange().readyAffinityVersion();

if (locAffVer.compareTo(lastAffChangedVer) < 0) {
if (locAffVer.before(lastAffChangedVer)) {
IgniteLogger log = cacheMsg.messageLogger(cctx);

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -889,6 +889,7 @@ private void processFailedMessage(UUID nodeId,
0,
req.classError(),
null,
false,
false);

sendResponseOnFailedMessage(nodeId, res, cctx, plc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,12 @@ private class CleanupWorker extends GridWorker {
return;
}

if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException))
if (!(t instanceof IgniteInterruptedCheckedException || t instanceof InterruptedException)) {
if (isCancelled)
return;

err = t;
}

throw t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ public IgniteInternalFuture<GridNearLockResponse> lockAllAsync(
if (tx == null) {
tx = new GridDhtTxLocal(
ctx.shared(),
req.topologyVersion(),
topology().readyTopologyVersion(),
nearNode.id(),
req.version(),
req.futureId(),
Expand Down Expand Up @@ -1350,7 +1350,8 @@ private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode,
0,
null,
topVer,
ctx.deploymentEnabled());
ctx.deploymentEnabled(),
false);

try {
ctx.io().send(nearNode, res, ctx.ioPolicy());
Expand Down Expand Up @@ -1387,6 +1388,12 @@ private GridNearLockResponse createLockReply(
assert tx == null || tx.xidVersion().equals(mappedVer);

try {
// All subsequent lock requests must use actual topology version to avoid mapping on invalid primaries.
AffinityTopologyVersion clienRemapVer = req.firstClientRequest() &&
tx != null &&
topology().readyTopologyVersion().after(req.topologyVersion()) ?
topology().readyTopologyVersion() : null;

// Send reply back to originating near node.
GridNearLockResponse res = new GridNearLockResponse(ctx.cacheId(),
req.version(),
Expand All @@ -1395,8 +1402,9 @@ private GridNearLockResponse createLockReply(
tx != null && tx.onePhaseCommit(),
entries.size(),
err,
null,
ctx.deploymentEnabled());
clienRemapVer,
ctx.deploymentEnabled(),
clienRemapVer != null);

if (err == null) {
res.pending(localDhtPendingVersions(entries, mappedVer));
Expand Down Expand Up @@ -1507,7 +1515,8 @@ private GridNearLockResponse createLockReply(
entries.size(),
e,
null,
ctx.deploymentEnabled());
ctx.deploymentEnabled(),
false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1642,7 +1642,7 @@ private void map(IgniteTxEntry entry) {
List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());

assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) :
"localNode = " + cctx.localNodeId() + ", dhtNodes = " + dhtNodes;
"cacheId=" + cacheCtx.cacheId() + ", localNode = " + cctx.localNodeId() + ", dhtNodes = " + dhtNodes;

if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap

ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
@Override public void apply(UUID nodeId, GridNearLockResponse res) {
processLockResponse(nodeId, res);
processNearLockResponse(nodeId, res);
}
});
}
Expand Down Expand Up @@ -1163,7 +1163,7 @@ else if (!b)
* @param nodeId Node ID.
* @param res Response.
*/
private void processLockResponse(UUID nodeId, GridNearLockResponse res) {
private void processNearLockResponse(UUID nodeId, GridNearLockResponse res) {
if (txLockMsgLog.isDebugEnabled())
txLockMsgLog.debug("Received near lock response [txId=" + res.version() + ", node=" + nodeId + ']');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {

final GridDhtTopologyFuture fut;

final boolean finish;
final boolean finished;

try {
if (cctx.topology().stopping()) {
Expand All @@ -843,9 +843,9 @@ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {

fut = cctx.topologyVersionFuture();

finish = fut.isDone();
finished = fut.isDone();

if (finish) {
if (finished) {
Throwable err = fut.validateCache(cctx, recovery, read, null, keys);

if (err != null) {
Expand All @@ -858,7 +858,7 @@ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {

if (remap) {
if (tx != null)
tx.onRemap(topVer);
tx.onRemap(topVer, true);

synchronized (this) {
this.topVer = topVer;
Expand All @@ -882,7 +882,7 @@ private void mapOnTopology(final boolean remap, @Nullable final Runnable c) {
cctx.topology().readUnlock();
}

if (finish) {
if (finished) {
map(keys, remap, false);

if (c != null)
Expand Down Expand Up @@ -1702,94 +1702,117 @@ void onResult(GridNearLockResponse res) {
if (res.clientRemapVersion() != null) {
assert cctx.kernalContext().clientNode();

IgniteInternalFuture<?> affFut =
cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
if (res.compatibleRemapVersion()) {
if (tx != null) {
// Versions are compatible.
cctx.shared().exchange().
lastAffinityChangedTopologyVersion(res.clientRemapVersion(), tx.topologyVersionSnapshot());

cctx.time().waitAsync(affFut, tx == null ? 0 : tx.remainingTime(), (e, timedOut) -> {
if (errorOrTimeoutOnTopologyVersion(e, timedOut))
return;
tx.onRemap(res.clientRemapVersion(), false);

try {
remap();
}
finally {
cctx.shared().txContextReset();
}
});
}
else {
int i = 0;
// Use remapped version for all subsequent mappings.
synchronized (GridDhtColocatedLockFuture.this) {
for (GridNearLockMapping mapping : mappings) {
GridNearLockRequest req = mapping.request();

for (KeyCacheObject k : keys) {
IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);
assert req != null : mapping;

CacheObject newVal = res.value(i);
req.topologyVersion(res.clientRemapVersion());
}
}
}
}
else {
IgniteInternalFuture<?> affFut =
cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());

GridCacheVersion dhtVer = res.dhtVersion(i);
cctx.time().waitAsync(affFut, tx == null ? 0 : tx.remainingTime(), (e, timedOut) -> {
if (errorOrTimeoutOnTopologyVersion(e, timedOut))
return;

if (newVal == null) {
if (oldValTup != null) {
if (oldValTup.get1().equals(dhtVer))
newVal = oldValTup.get2();
try {
remap();
}
}
finally {
cctx.shared().txContextReset();
}
});

if (inTx()) {
IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));
return;
}
}

// In colocated cache we must receive responses only for detached entries.
assert txEntry.cached().detached() : txEntry;
int i = 0;

txEntry.markLocked();
for (KeyCacheObject k : keys) {
IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(k);

GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
CacheObject newVal = res.value(i);

if (res.dhtVersion(i) == null) {
onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
"(will fail the lock): " + res));
GridCacheVersion dhtVer = res.dhtVersion(i);

return;
}
if (newVal == null) {
if (oldValTup != null) {
if (oldValTup.get1().equals(dhtVer))
newVal = oldValTup.get2();
}
}

// Set value to detached entry.
entry.resetFromPrimary(newVal, dhtVer);
if (inTx()) {
IgniteTxEntry txEntry = tx.entry(cctx.txKey(k));

tx.hasRemoteLocks(true);
// In colocated cache we must receive responses only for detached entries.
assert txEntry.cached().detached() : txEntry;

if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
}
else
cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);

if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
cctx.events().addEvent(cctx.affinity().partition(k),
k,
tx,
null,
EVT_CACHE_OBJECT_READ,
newVal,
newVal != null,
null,
false,
CU.subjectId(tx, cctx.shared()),
null,
tx == null ? null : tx.resolveTaskName(),
keepBinary);
txEntry.markLocked();

GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();

if (res.dhtVersion(i) == null) {
onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
"(will fail the lock): " + res));

return;
}

i++;
}
// Set value to detached entry.
entry.resetFromPrimary(newVal, dhtVer);

try {
proceedMapping();
tx.hasRemoteLocks(true);

if (log.isDebugEnabled())
log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
}
catch (IgniteCheckedException e) {
onDone(e);
else
cctx.mvcc().markExplicitOwner(cctx.txKey(k), threadId);

if (retval && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
cctx.events().addEvent(cctx.affinity().partition(k),
k,
tx,
null,
EVT_CACHE_OBJECT_READ,
newVal,
newVal != null,
null,
false,
CU.subjectId(tx, cctx.shared()),
null,
tx == null ? null : tx.resolveTaskName(),
keepBinary);
}

onDone(true);
i++;
}

try {
proceedMapping();
}
catch (IgniteCheckedException e) {
onDone(e);
}

onDone(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -1171,9 +1170,11 @@ else if (loc != null && state == RENTING && !showRenting) {
AffinityTopologyVersion diffVer = diffFromAffinityVer;

if (!diffVer.equals(topVer)) {
LT.warn(log, "Requested topology version does not match calculated diff, need to check if " +
"affinity has changed [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
if (log.isDebugEnabled()) {
log.debug("Requested topology version does not match calculated diff, need to check if " +
"affinity has changed [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
}

boolean affChanged;

Expand All @@ -1183,9 +1184,11 @@ else if (loc != null && state == RENTING && !showRenting) {
affChanged = ctx.exchange().affinityChanged(topVer, diffVer);

if (affChanged) {
LT.warn(log, "Requested topology version does not match calculated diff, will require full iteration to" +
"calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
if (log.isDebugEnabled()) {
log.debug("Requested topology version does not match calculated diff, will require full iteration to" +
"calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
}

nodes = new ArrayList<>();

Expand Down

0 comments on commit fe8c746

Please sign in to comment.