Skip to content

Commit

Permalink
# IGNITE-283: Fixed conflict resolution.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov committed Feb 24, 2015
1 parent fdd150b commit 45ce814
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 415 deletions.

Large diffs are not rendered by default.

Expand Up @@ -506,38 +506,38 @@ private void commitIfLocked() throws IgniteCheckedException {

GridCacheVersion explicitVer = txEntry.conflictVersion();

if (explicitVer == null)
explicitVer = writeVersion();

if (txEntry.ttl() == CU.TTL_ZERO)
op = DELETE;

boolean drNeedResolve = cacheCtx.conflictNeedResolve();
boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();

GridCacheVersionConflictContext<K, V> conflictCtx = null;

if (drNeedResolve) {
if (conflictNeedResolve) {
IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K, V>>
drRes = conflictResolve(op, txEntry.key(), val, valBytes,
txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached);
drRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, cached);

assert drRes != null;

GridCacheVersionConflictContext<K, V> drCtx = drRes.get2();
conflictCtx = drRes.get2();

if (drCtx.isUseOld())
if (conflictCtx.isUseOld())
op = NOOP;
else if (drCtx.isUseNew()) {
txEntry.ttl(drCtx.ttl());

if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId())
txEntry.conflictExpireTime(drCtx.expireTime());
else
txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
else if (conflictCtx.isUseNew()) {
txEntry.ttl(conflictCtx.ttl());
txEntry.conflictExpireTime(conflictCtx.expireTime());
}
else if (drCtx.isMerge()) {
else if (conflictCtx.isMerge()) {
op = drRes.get1();
val = drCtx.mergeValue();
val = conflictCtx.mergeValue();
valBytes = null;
explicitVer = writeVersion();

txEntry.ttl(drCtx.ttl());
txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE);
txEntry.ttl(conflictCtx.ttl());
txEntry.conflictExpireTime(conflictCtx.expireTime());
}
}
else
Expand Down Expand Up @@ -610,26 +610,28 @@ else if (op == READ) {
"Transaction does not own lock for group lock entry during commit [tx=" +
this + ", txEntry=" + txEntry + ']';

if (txEntry.ttl() != -1L)
cached.updateTtl(null, txEntry.ttl());
if (conflictCtx == null || !conflictCtx.isUseOld()) {
if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
cached.updateTtl(null, txEntry.ttl());

if (nearCached != null) {
V val0 = null;
byte[] valBytes0 = null;
if (nearCached != null) {
V val0 = null;
byte[] valBytes0 = null;

GridCacheValueBytes valBytesTuple = cached.valueBytes();
GridCacheValueBytes valBytesTuple = cached.valueBytes();

if (!valBytesTuple.isNull()) {
if (valBytesTuple.isPlain())
val0 = (V)valBytesTuple.get();
if (!valBytesTuple.isNull()) {
if (valBytesTuple.isPlain())
val0 = (V) valBytesTuple.get();
else
valBytes0 = valBytesTuple.get();
}
else
valBytes0 = valBytesTuple.get();
}
else
val0 = cached.rawGet();
val0 = cached.rawGet();

nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
cached.ttl(), nodeId);
nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(),
cached.ttl(), nodeId);
}
}
}

Expand Down
Expand Up @@ -1118,16 +1118,14 @@ public void updateAllAsyncInternal0(

dhtFut = createDhtFuture(ver, req, res, completionCb, false);

boolean replicate = ctx.isDrEnabled();

expiry = expiryPolicy(req.expiry());

GridCacheReturn<Object> retVal = null;

if (keys.size() > 1 && // Several keys ...
writeThrough() && // and store is enabled ...
!ctx.store().isLocalStore() && // and this is not local store ...
!ctx.dr().receiveEnabled() // and no DR.
!ctx.dr().receiveEnabled() // and no DR.
) {
// This method can only be used when there are no replicated entries in the batch.
UpdateBatchResult<K, V> updRes = updateWithBatch(node,
Expand All @@ -1138,7 +1136,7 @@ public void updateAllAsyncInternal0(
ver,
dhtFut,
completionCb,
replicate,
ctx.isDrEnabled(),
taskName,
expiry);

Expand All @@ -1157,7 +1155,7 @@ public void updateAllAsyncInternal0(
ver,
dhtFut,
completionCb,
replicate,
ctx.isDrEnabled(),
taskName,
expiry);

Expand Down Expand Up @@ -1670,10 +1668,7 @@ private UpdateSingleResult<K, V> updateSingle(
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);

assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; // Plain version is expected here.

if (newConflictVer == null)
newConflictVer = ver;
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;

boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(),
req.topologyVersion());
Expand Down Expand Up @@ -1723,15 +1718,12 @@ primary && writeThrough(),

if (dhtFut != null) {
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult();
GridCacheVersionConflictContext<K, V> conflictCtx = updRes.conflictResolveResult();

long ttl = updRes.newTtl();
long expireTime = updRes.conflictExpireTime();

if (ctx == null)
if (conflictCtx == null)
newConflictVer = null;
else if (ctx.isMerge()) {
newConflictVer = null; // DR version is discarded in case of merge.
else if (conflictCtx.isMerge()) {
newConflictVer = null; // Conflict version is discarded in case of merge.
newValBytes = null; // Value has been changed.
}

Expand All @@ -1746,7 +1738,7 @@ else if (ctx.isMerge()) {
newValBytes,
entryProcessor,
updRes.newTtl(),
expireTime,
updRes.conflictExpireTime(),
newConflictVer);
}

Expand All @@ -1756,8 +1748,8 @@ else if (ctx.isMerge()) {
updRes.newValue(),
newValBytes,
entryProcessor,
ttl,
expireTime);
updRes.newTtl(),
updRes.conflictExpireTime());
}
else {
if (log.isDebugEnabled())
Expand All @@ -1771,9 +1763,6 @@ else if (ctx.isMerge()) {
if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
GridCacheVersionConflictContext<K, V> ctx = updRes.conflictResolveResult();

long ttl = updRes.newTtl();
long expireTime = updRes.conflictExpireTime();

if (ctx != null && ctx.isMerge())
newValBytes = null;

Expand All @@ -1782,11 +1771,11 @@ else if (ctx.isMerge()) {
res.addNearValue(i,
updRes.newValue(),
newValBytes,
ttl,
expireTime);
updRes.newTtl(),
updRes.conflictExpireTime());
}
else
res.addNearTtl(i, ttl, expireTime);
res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());

if (updRes.newValue() != null || newValBytes != null) {
IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
Expand Down Expand Up @@ -1880,7 +1869,7 @@ else if (F.contains(readers, node.id())) // Reader became primary or backup.
) {
assert putMap == null ^ rmvKeys == null;

assert req.conflictVersions() == null : "updatePartialBatch cannot be called when there are DR entries in the batch.";
assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch.";

long topVer = req.topologyVersion();

Expand Down Expand Up @@ -2480,9 +2469,6 @@ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateReque
long ttl = req.ttl(i);
long expireTime = req.conflictExpireTime(i);

if (ttl != -1L && expireTime == -1L)
expireTime = CU.toExpireTime(ttl);

GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
ver,
nodeId,
Expand Down
Expand Up @@ -520,35 +520,39 @@ private void map0(GridDiscoveryTopologySnapshot topSnapshot,
K key = F.first(keys);

Object val;
long drTtl;
long drExpireTime;
GridCacheVersion drVer;
GridCacheVersion conflictVer;
long conflictTtl;
long conflictExpireTime;

if (vals != null) {
// Regular PUT.
val = F.first(vals);
drTtl = -1;
drExpireTime = -1;
drVer = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else if (conflictPutVals != null) {
GridCacheDrInfo<V> drPutVal = F.first(conflictPutVals);
// Conflict PUT.
GridCacheDrInfo<V> conflictPutVal = F.first(conflictPutVals);

val = drPutVal.value();
drTtl = drPutVal.ttl();
drExpireTime = drPutVal.expireTime();
drVer = drPutVal.version();
val = conflictPutVal.value();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
// Conflict REMOVE.
val = null;
drTtl = -1;
drExpireTime = -1;
drVer = F.first(conflictRmvVals);
conflictVer = F.first(conflictRmvVals);
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
// Regular REMOVE.
val = null;
drTtl = -1;
drExpireTime = -1;
drVer = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}

// We still can get here if user pass map with single element.
Expand Down Expand Up @@ -599,7 +603,7 @@ else if (conflictRmvVals != null) {
subjId,
taskNameHash);

req.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, true);
req.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, true);

single = true;

Expand All @@ -614,15 +618,15 @@ else if (conflictRmvVals != null) {
if (vals != null)
it = vals.iterator();

Iterator<GridCacheDrInfo<V>> drPutValsIt = null;
Iterator<GridCacheDrInfo<V>> conflictPutValsIt = null;

if (conflictPutVals != null)
drPutValsIt = conflictPutVals.iterator();
conflictPutValsIt = conflictPutVals.iterator();

Iterator<GridCacheVersion> drRmvValsIt = null;
Iterator<GridCacheVersion> conflictRmvValsIt = null;

if (conflictRmvVals != null)
drRmvValsIt = conflictRmvVals.iterator();
conflictRmvValsIt = conflictRmvVals.iterator();

Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);

Expand All @@ -643,15 +647,15 @@ else if (conflictRmvVals != null) {
}

Object val;
long drTtl;
long drExpireTime;
GridCacheVersion drVer;
GridCacheVersion conflictVer;
long conflictTtl;
long conflictExpireTime;

if (vals != null) {
val = it.next();
drTtl = -1;
drExpireTime = -1;
drVer = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;

if (val == null) {
NullPointerException err = new NullPointerException("Null value.");
Expand All @@ -662,24 +666,24 @@ else if (conflictRmvVals != null) {
}
}
else if (conflictPutVals != null) {
GridCacheDrInfo<V> drPutVal = drPutValsIt.next();
GridCacheDrInfo<V> conflictPutVal = conflictPutValsIt.next();

val = drPutVal.value();
drTtl = drPutVal.ttl();
drExpireTime = drPutVal.expireTime();
drVer = drPutVal.version();
val = conflictPutVal.value();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
val = null;
drTtl = -1;
drExpireTime = -1;
drVer = drRmvValsIt.next();
conflictVer = conflictRmvValsIt.next();
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
val = null;
drTtl = -1;
drExpireTime = -1;
drVer = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}

if (val == null && op != GridCacheOperation.DELETE)
Expand Down Expand Up @@ -727,7 +731,7 @@ else if (conflictRmvVals != null) {
"Invalid mapping state [old=" + old + ", remap=" + remap + ']';
}

mapped.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, i == 0);
mapped.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);

i++;
}
Expand Down
Expand Up @@ -317,9 +317,6 @@ public void processDhtAtomicUpdateRequest(
long ttl = req.nearTtl(i);
long expireTime = req.nearExpireTime(i);

if (ttl != -1L && expireTime == -1L)
expireTime = CU.toExpireTime(ttl);

GridCacheUpdateAtomicResult<K, V> updRes = entry.innerUpdate(
ver,
nodeId,
Expand Down

0 comments on commit 45ce814

Please sign in to comment.