diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 18fce53d4e3a6..db7272b6f63da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -802,18 +802,8 @@ private V innerGet0(IgniteInternalTx tx, evt = false; } - if (ret != null && expiryPlc != null) { - long ttl = expiryPlc.forAccess(); - - if (ttl != CU.TTL_NOT_CHANGED) { - updateTtl(ttl); - - expiryPlc.ttlUpdated(key(), - getOrMarshalKeyBytes(), - version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); - } - } + if (ret != null && expiryPlc != null) + updateTtl(expiryPlc); } if (ret != null) @@ -1367,12 +1357,8 @@ else if (ttl == CU.TTL_NOT_CHANGED) boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { - if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) { - long ttl = CU.toTtl(expiryPlc.getExpiryForAccess()); - - if (ttl != CU.TTL_NOT_CHANGED) - updateTtl(ttl); - } + if (expiryPlc != null && !readThrough && filter != cctx.noPeekArray() && hasValueUnlocked()) + updateTtl(expiryPlc); return new T3<>(false, retval ? old : null, null); } @@ -1409,12 +1395,8 @@ else if (ttl == CU.TTL_NOT_CHANGED) } if (!entry.modified()) { - if (expiryPlc != null && !readThrough && hasValueUnlocked()) { - long newTtl = CU.toTtl(expiryPlc.getExpiryForAccess()); - - if (newTtl != CU.TTL_NOT_CHANGED) - updateTtl(newTtl); - } + if (expiryPlc != null && !readThrough && hasValueUnlocked()) + updateTtl(expiryPlc); return new GridTuple3<>(false, null, invokeRes); } @@ -1512,7 +1494,7 @@ else if (ttl != CU.TTL_ZERO) // in load methods without actually holding entry lock. clearIndex(old); - update(null, null, 0, 0, ver); + update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); if (evt) { V evtOld = null; @@ -1555,6 +1537,7 @@ else if (ttl != CU.TTL_ZERO) } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public GridCacheUpdateAtomicResult innerUpdate( GridCacheVersion newVer, UUID evtNodeId, @@ -1572,8 +1555,8 @@ else if (ttl != CU.TTL_ZERO) boolean verCheck, @Nullable IgnitePredicate>[] filter, GridDrType drType, - long conflictTtl, - long conflictExpireTime, + long explicitTtl, + long explicitExpireTime, @Nullable GridCacheVersion conflictVer, boolean conflictResolve, boolean intercept, @@ -1582,21 +1565,24 @@ else if (ttl != CU.TTL_ZERO) ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); - V old; - boolean res = true; + V oldVal; V updated; GridCacheVersion enqueueVer = null; - GridCacheVersionConflictContext drRes = null; + GridCacheVersionConflictContext conflictCtx = null; EntryProcessorResult invokeRes = null; - long newTtl = -1L; - long newExpireTime = 0L; - long newDrExpireTime = -1L; // Explicit DR expire time which possibly will be sent to DHT node. + // System TTL/ET which may have special values. + long newSysTtl; + long newSysExpireTime; + + // TTL/ET which will be passed to entry on update. + long newTtl; + long newExpireTime; synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1609,56 +1595,47 @@ else if (ttl != CU.TTL_ZERO) Object transformClo = null; + // Request-level conflict resolution is needed, i.e. we do not know who will win in advance. if (conflictResolve) { - GridCacheVersion oldDrVer = version().conflictVersion(); - - boolean drNeedResolve = cctx.conflictNeedResolve(); - - if (drNeedResolve) { - // Get old value. - V oldVal = rawGetOrUnmarshalUnlocked(true); + GridCacheVersion oldConflictVer = version().conflictVersion(); + // Cache is conflict-enabled. + if (cctx.conflictNeedResolve()) { + // Get new value, optionally unmarshalling and/or transforming it. if (writeObj == null && valBytes != null) writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); if (op == GridCacheOperation.TRANSFORM) { transformClo = writeObj; - writeObj = ((IgniteClosure)writeObj).apply(oldVal); - } - - K k = key(); - - if (conflictTtl >= 0L) { - // DR TTL is set explicitly - assert conflictExpireTime >= 0L; - - newTtl = conflictTtl; - newExpireTime = conflictExpireTime; + writeObj = ((IgniteClosure)writeObj).apply(rawGetOrUnmarshalUnlocked(true)); + valBytes = null; } - else { - long ttl = expiryPlc != null ? (isNew() ? expiryPlc.forCreate() : expiryPlc.forUpdate()) : -1L; - newTtl = ttl < 0 ? ttlExtras() : ttl; - newExpireTime = CU.toExpireTime(newTtl); - } + GridTuple3 expiration = ttlAndExpireTime(expiryPlc, explicitTtl, + explicitExpireTime); + // Prepare old and new entries for conflict resolution. GridCacheVersionedEntryEx oldEntry = versionedEntry(); - GridCacheVersionedEntryEx newEntry = - new GridCachePlainVersionedEntry<>(k, (V)writeObj, newTtl, newExpireTime, conflictVer); + GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj, + expiration.get1(), expiration.get2(), conflictVer != null ? conflictVer : newVer); + + // Resolve conflict. + conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); - drRes = cctx.conflictResolve(oldEntry, newEntry, verCheck); + assert conflictCtx != null; - assert drRes != null; + // Use old value? + if (conflictCtx.isUseOld()) { + GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; - if (drRes.isUseOld()) { // Handle special case with atomic comparator. - if (!isNew() && // Not initial value, - verCheck && // and atomic version check, - oldDrVer.dataCenterId() == conflictVer.dataCenterId() && // and data centers are equal, - ATOMIC_VER_COMPARATOR.compare(oldDrVer, conflictVer) == 0 && // and both versions are equal, - cctx.writeThrough() && // and store is enabled, - primary) // and we are primary. + if (!isNew() && // Not initial value, + verCheck && // and atomic version check, + oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. { V val = rawGetOrUnmarshalUnlocked(false); @@ -1671,47 +1648,39 @@ else if (ttl != CU.TTL_ZERO) cctx.store().putToStore(null, key(), val, ver); } - old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, - old, + retval ? rawGetOrUnmarshalUnlocked(false) : null, null, invokeRes, - 0L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); } - else if (drRes.isUseNew()) - op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + // Will update something. else { - assert drRes.isMerge(); - - writeObj = drRes.mergeValue(); - valBytes = null; + // Merge is a local update which override passed value bytes. + if (conflictCtx.isMerge()) { + writeObj = conflictCtx.mergeValue(); + valBytes = null; - conflictVer = null; // Update will be considered as local. + conflictVer = null; + } + else + assert conflictCtx.isUseNew(); + // Update value is known at this point, so update operation type. op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; } - - newTtl = drRes.ttl(); - newExpireTime = drRes.expireTime(); - - // Explicit DR expire time will be passed to remote node only in that case. - if (!drRes.explicitTtl() && !drRes.isMerge()) { - if (drRes.isUseNew() && newEntry.dataCenterId() != cctx.dataCenterId() || - drRes.isUseOld() && oldEntry.dataCenterId() != cctx.dataCenterId()) - newDrExpireTime = drRes.expireTime(); - } } else - // Nullify DR version on this update, so that we will use regular version during next updates. + // Nullify conflict version on this update, so that we will use regular version during next updates. conflictVer = null; } - if (drRes == null) { // Perform version check only in case there will be no explicit conflict resolution. + // Perform version check only in case there was no explicit conflict resolution. + if (conflictCtx == null) { if (verCheck) { if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer) >= 0) { if (ATOMIC_VER_COMPARATOR.compare(ver, newVer) == 0 && cctx.writeThrough() && primary) { @@ -1735,14 +1704,12 @@ else if (drRes.isUseNew()) "[entry=" + this + ", newVer=" + newVer + ']'); } - old = retval ? rawGetOrUnmarshalUnlocked(false) : val; - return new GridCacheUpdateAtomicResult<>(false, - old, + retval ? rawGetOrUnmarshalUnlocked(false) : null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1753,46 +1720,46 @@ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer) <= 0 : "Invalid version for inner update [entry=" + this + ", newVer=" + newVer + ']'; } - // Possibly get old value form store. - old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; - - GridCacheValueBytes oldBytes = valueBytesUnlocked(); + // Prepare old value and value bytes. + oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + GridCacheValueBytes oldValBytes = valueBytesUnlocked(); + // Possibly read value from store. boolean readThrough = false; - if (needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old = readThrough(null, key, false, subjId, taskName); + if (needVal && oldVal == null && (cctx.readThrough() && + (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + oldVal = readThrough(null, key, false, subjId, taskName); readThrough = true; // Detach value before index update. if (cctx.portableEnabled()) - old = (V)cctx.kernalContext().portable().detachPortable(old); + oldVal = (V)cctx.kernalContext().portable().detachPortable(oldVal); - long ttl = 0; - long expireTime = 0; + // Calculate initial TTL and expire time. + long initTtl; + long initExpireTime; - if (expiryPlc != null && old != null) { - ttl = expiryPlc.forCreate(); + if (expiryPlc != null && oldVal != null) { + IgniteBiTuple initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc); - if (ttl == CU.TTL_ZERO) { - ttl = 1; - expireTime = U.currentTimeMillis() - 1; - } - else if (ttl == CU.TTL_NOT_CHANGED) - ttl = 0; - else - expireTime = CU.toExpireTime(ttl); + initTtl = initTtlAndExpireTime.get1(); + initExpireTime = initTtlAndExpireTime.get2(); + } + else { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; } - if (old != null) - updateIndex(old, null, expireTime, ver, null); + if (oldVal != null) + updateIndex(oldVal, null, initExpireTime, ver, null); else clearIndex(null); - update(old, null, expireTime, ttl, ver); + update(oldVal, null, initExpireTime, initTtl, ver); - if (deletedUnlocked() && old != null && !isInternal()) + if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); } @@ -1800,7 +1767,7 @@ else if (ttl == CU.TTL_NOT_CHANGED) if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { // PutIfAbsent methods mustn't update hit/miss statistics if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || filter != cctx.noPeekArray()) - cctx.cache().metrics0().onRead(old != null); + cctx.cache().metrics0().onRead(oldVal != null); } // Check filter inside of synchronization. @@ -1808,38 +1775,30 @@ else if (ttl == CU.TTL_NOT_CHANGED) boolean pass = cctx.isAll(wrapFilterLocked(), filter); if (!pass) { - if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray()) { - newTtl = expiryPlc.forAccess(); - - if (newTtl != CU.TTL_NOT_CHANGED) { - updateTtl(newTtl); - - expiryPlc.ttlUpdated(key, - getOrMarshalKeyBytes(), - version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); - } - } + if (expiryPlc != null && !readThrough && hasValueUnlocked() && filter != cctx.noPeekArray()) + updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); } } - // Calculate new value. + // Calculate new value in case we met transform. if (op == GridCacheOperation.TRANSFORM) { + assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier."; + transformClo = writeObj; EntryProcessor entryProcessor = (EntryProcessor)writeObj; - CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, old); + CacheInvokeEntry entry = new CacheInvokeEntry<>(cctx, key, oldVal); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1857,31 +1816,21 @@ else if (ttl == CU.TTL_NOT_CHANGED) catch (Exception e) { invokeRes = new CacheInvokeResult<>(e); - updated = old; + updated = oldVal; - valBytes = oldBytes.getIfMarshaled(); + valBytes = oldValBytes.getIfMarshaled(); } if (!entry.modified()) { - if (expiryPlc != null && !readThrough && hasValueUnlocked()) { - newTtl = expiryPlc.forAccess(); - - if (newTtl != CU.TTL_NOT_CHANGED) { - updateTtl(newTtl); - - expiryPlc.ttlUpdated(key, - getOrMarshalKeyBytes(), - version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); - } - } + if (expiryPlc != null && !readThrough && hasValueUnlocked()) + updateTtl(expiryPlc); return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1896,7 +1845,7 @@ else if (ttl == CU.TTL_NOT_CHANGED) boolean hadVal = hasValueUnlocked(); - // Incorporate DR version into new version if needed. + // Incorporate conflict version into new version if needed. if (conflictVer != null && conflictVer != newVer) newVer = new GridCacheVersionEx(newVer.topologyVersion(), newVer.globalTime(), @@ -1905,58 +1854,82 @@ else if (ttl == CU.TTL_NOT_CHANGED) newVer.dataCenterId(), conflictVer); - IgniteBiTuple interceptRes = null; - - long ttl0 = newTtl; if (op == GridCacheOperation.UPDATE) { - if (drRes == null) { + // Conflict context is null if there were no explicit conflict resolution. + if (conflictCtx == null) { // Calculate TTL and expire time for local update. - if (conflictTtl >= 0L) { - assert conflictExpireTime >= 0L : conflictExpireTime; + if (explicitTtl != CU.TTL_NOT_CHANGED) { + // If conflict existed, expire time must be explicit. + assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE; + + newSysTtl = newTtl = explicitTtl; + newSysExpireTime = explicitExpireTime; - ttl0 = conflictTtl; - newExpireTime = conflictExpireTime; + newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ? + explicitExpireTime : CU.toExpireTime(explicitTtl); } else { - assert conflictExpireTime == CU.TTL_NOT_CHANGED : conflictExpireTime; + newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED : + hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); - if (expiryPlc != null) - newTtl = hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate(); - else - newTtl = CU.TTL_NOT_CHANGED; - - if (newTtl == CU.TTL_NOT_CHANGED) { - ttl0 = ttlExtras(); + if (newSysTtl == CU.TTL_NOT_CHANGED) { + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + newTtl = ttlExtras(); newExpireTime = expireTimeExtras(); } + else if (newSysTtl == CU.TTL_ZERO) { + op = GridCacheOperation.DELETE; + + newSysTtl = CU.TTL_NOT_CHANGED; + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + + newTtl = CU.TTL_ETERNAL; + newExpireTime = CU.EXPIRE_TIME_ETERNAL; + + updated = null; + valBytes = null; + } else { - ttl0 = newTtl; - newExpireTime = CU.toExpireTime(ttl0); + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; + newTtl = newSysTtl; + newExpireTime = CU.toExpireTime(newTtl); } } } - else if (newTtl == CU.TTL_NOT_CHANGED) - ttl0 = ttlExtras(); + else { + newSysTtl = newTtl = conflictCtx.ttl(); + newSysExpireTime = newExpireTime = conflictCtx.expireTime(); + } } + else { + assert op == GridCacheOperation.DELETE; - if (ttl0 == CU.TTL_ZERO) { - op = GridCacheOperation.DELETE; + newSysTtl = CU.TTL_NOT_CHANGED; + newSysExpireTime = CU.EXPIRE_TIME_CALCULATE; - updated = null; + newTtl = CU.TTL_ETERNAL; + newExpireTime = CU.EXPIRE_TIME_ETERNAL; } + // TTL and expire time must be resolved at this point. + assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0; + assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0; + + IgniteBiTuple interceptRes = null; + + // Actual update. if (op == GridCacheOperation.UPDATE) { if (intercept) { - V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, old, updated); + V interceptorVal = (V)cctx.config().getInterceptor().onBeforePut(key, oldVal, updated); if (interceptorVal == null) return new GridCacheUpdateAtomicResult<>(false, - retval ? old : null, + retval ? oldVal : null, null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -1992,9 +1965,9 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - updateIndex(updated, valBytes, newExpireTime, newVer, old); + updateIndex(updated, valBytes, newExpireTime, newVer, oldVal); - update(updated, valBytes, newExpireTime, ttl0, newVer); + update(updated, valBytes, newExpireTime, newTtl, newVer); drReplicate(drType, updated, valBytes, newVer); @@ -2004,7 +1977,7 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this V evtOld = null; if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, @@ -2013,7 +1986,7 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, @@ -2023,15 +1996,15 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this } else { if (intercept) { - interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, old); + interceptRes = cctx.config().getInterceptor().onBeforeRemove(key, oldVal); if (cctx.cancelRemove(interceptRes)) return new GridCacheUpdateAtomicResult<>(false, cctx.unwrapTemporary(interceptRes.get2()), null, invokeRes, - -1L, - -1L, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, null, null, false); @@ -2043,7 +2016,7 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - clearIndex(old); + clearIndex(oldVal); if (hadVal) { assert !deletedUnlocked(); @@ -2068,7 +2041,10 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi boolean hasValPtr = valPtr != 0; // Clear value on backup. Entry will be removed from cache when it got evicted from queue. - update(null, null, 0, 0, newVer); + update(null, null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); + + assert newSysTtl == CU.TTL_NOT_CHANGED; + assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; if (cctx.offheapTiered() && hasValPtr) { boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); @@ -2086,7 +2062,7 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi V evtOld = null; if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, @@ -2095,7 +2071,7 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); + evtOld = cctx.unwrapTemporary(oldVal); cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, @@ -2104,17 +2080,14 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi } res = hadVal; - - // Do not propagate zeroed TTL and expire time. - newTtl = -1L; - newDrExpireTime = -1L; } if (res) updateMetrics(op, metrics); if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false); + cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), + oldVal, oldValBytes, false); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2122,27 +2095,93 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi if (op == GridCacheOperation.UPDATE) cctx.config().getInterceptor().onAfterPut(key, val); else - cctx.config().getInterceptor().onAfterRemove(key, old); + cctx.config().getInterceptor().onAfterRemove(key, oldVal); if (interceptRes != null) - old = cctx.unwrapTemporary(interceptRes.get2()); + oldVal = cctx.unwrapTemporary(interceptRes.get2()); } } if (log.isDebugEnabled()) - log.debug("Updated cache entry [val=" + val + ", old=" + old + ", entry=" + this + ']'); + log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']'); return new GridCacheUpdateAtomicResult<>(res, - old, + oldVal, updated, invokeRes, - newTtl, - newDrExpireTime, + newSysTtl, + newSysExpireTime, enqueueVer, - drRes, + conflictCtx, true); } + /** + * @param expiry Expiration policy. + * @return Tuple holding initial TTL and expire time with the given expiry. + */ + private static IgniteBiTuple initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) { + assert expiry != null; + + long initTtl = expiry.forCreate(); + long initExpireTime; + + if (initTtl == CU.TTL_ZERO) { + initTtl = CU.TTL_MINIMUM; + initExpireTime = CU.expireTimeInPast(); + } + else if (initTtl == CU.TTL_NOT_CHANGED) { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; + } + else + initExpireTime = CU.toExpireTime(initTtl); + + return F.t(initTtl, initExpireTime); + } + + /** + * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time. + * + * @param expiry Expiration policy. + * @param ttl Explicit TTL. + * @param expireTime Explicit expire time. + * @return Result. + */ + private GridTuple3 ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) + throws GridCacheEntryRemovedException { + boolean rmv = false; + + // 1. If TTL is not changed, then calculate it based on expiry. + if (ttl == CU.TTL_NOT_CHANGED) { + if (expiry != null) + ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate(); + } + + // 2. If TTL is zero, then set delete marker. + if (ttl == CU.TTL_ZERO) { + rmv = true; + + ttl = CU.TTL_ETERNAL; + } + + // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL". + if (ttl == CU.TTL_NOT_CHANGED) { + if (isNew()) + ttl = CU.TTL_ETERNAL; + else { + ttl = ttlExtras(); + expireTime = expireTimeExtras(); + } + } + + // 4 If expire time was not set explicitly, then calculate it. + if (expireTime == CU.EXPIRE_TIME_CALCULATE) + expireTime = CU.toExpireTime(ttl); + + return F.t(ttl, expireTime, rmv); + } + /** * Perform DR if needed. * @@ -2517,7 +2556,7 @@ protected final void update(@Nullable V val, @Nullable byte[] valBytes, long exp GridCacheVersion ver) { assert ver != null; assert Thread.holdsLock(this); - assert ttl >= 0 : ttl; + assert ttl != CU.TTL_ZERO && ttl != CU.TTL_NOT_CHANGED && ttl >= 0 : ttl; long oldExpireTime = expireTimeExtras(); @@ -2534,6 +2573,39 @@ protected final void update(@Nullable V val, @Nullable byte[] valBytes, long exp this.ver = ver; } + /** + * Update TTL if it is changed. + * + * @param expiryPlc Expiry policy. + */ + private void updateTtl(ExpiryPolicy expiryPlc) { + long ttl = CU.toTtl(expiryPlc.getExpiryForAccess()); + + if (ttl != CU.TTL_NOT_CHANGED) + updateTtl(ttl); + } + + /** + * Update TTL is it is changed. + * + * @param expiryPlc Expiry policy. + * @throws IgniteCheckedException If failed. + * @throws GridCacheEntryRemovedException If failed. + */ + private void updateTtl(IgniteCacheExpiryPolicy expiryPlc) + throws IgniteCheckedException, GridCacheEntryRemovedException { + long ttl = expiryPlc.forAccess(); + + if (ttl != CU.TTL_NOT_CHANGED) { + updateTtl(ttl); + + expiryPlc.ttlUpdated(key(), + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + } + } + /** * @param ttl Time to live. */ @@ -2913,18 +2985,8 @@ private void groupLockSanityCheck(IgniteInternalTx tx) throws GridCacheEnt ver = this.ver; val = rawGetOrUnmarshalUnlocked(false); - if (val != null && expiryPlc != null) { - long ttl = expiryPlc.forAccess(); - - if (ttl != CU.TTL_NOT_CHANGED) { - updateTtl(ttl); - - expiryPlc.ttlUpdated(key(), - getOrMarshalKeyBytes(), - version(), - hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); - } - } + if (val != null && expiryPlc != null) + updateTtl(expiryPlc); } if (!cctx.isAll(wrap(), filter)) @@ -3392,30 +3454,6 @@ private GridCacheVersion nextVersion() { return expireTimeExtras(); } - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional"}) - @Override public long expireTime() throws GridCacheEntryRemovedException { - IgniteTxLocalAdapter tx; - - if (cctx.isDht()) - tx = cctx.dht().near().context().tm().localTx(); - else - tx = cctx.tm().localTx(); - - if (tx != null) { - long time = tx.entryExpireTime(txKey()); - - if (time > 0) - return time; - } - - synchronized (this) { - checkObsolete(); - - return expireTimeExtras(); - } - } - /** {@inheritDoc} */ @Override public long expireTimeUnlocked() { assert Thread.holdsLock(this); @@ -3491,13 +3529,27 @@ private GridCacheVersion nextVersion() { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional"}) - @Override public long ttl() throws GridCacheEntryRemovedException { - IgniteTxLocalAdapter tx; + @Override public long expireTime() throws GridCacheEntryRemovedException { + IgniteTxLocalAdapter tx = currentTx(); - if (cctx.isDht()) - tx = cctx.dht().near().context().tm().localTx(); - else - tx = cctx.tm().localTx(); + if (tx != null) { + long time = tx.entryExpireTime(txKey()); + + if (time > 0) + return time; + } + + synchronized (this) { + checkObsolete(); + + return expireTimeExtras(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional"}) + @Override public long ttl() throws GridCacheEntryRemovedException { + IgniteTxLocalAdapter tx = currentTx(); if (tx != null) { long entryTtl = tx.entryTtl(txKey()); @@ -3513,6 +3565,16 @@ private GridCacheVersion nextVersion() { } } + /** + * @return Current transaction. + */ + private IgniteTxLocalAdapter currentTx() { + if (cctx.isDht()) + return cctx.dht().near().context().tm().localTx(); + else + return cctx.tm().localTx(); + } + /** {@inheritDoc} */ @Override public void updateTtl(@Nullable GridCacheVersion ver, long ttl) { synchronized (this) { @@ -4224,7 +4286,9 @@ public long expireTimeExtras() { * @param expireTime Expire time. */ protected void ttlAndExpireTimeExtras(long ttl, long expireTime) { - extras = (extras != null) ? extras.ttlAndExpireTime(ttl, expireTime) : ttl != 0 ? + assert ttl != CU.TTL_NOT_CHANGED && ttl != CU.TTL_ZERO; + + extras = (extras != null) ? extras.ttlAndExpireTime(ttl, expireTime) : ttl != CU.TTL_ETERNAL ? new GridCacheTtlEntryExtras(ttl, expireTime) : null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index e36947e17dfdd..dbf82dd46c7f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -506,38 +506,38 @@ private void commitIfLocked() throws IgniteCheckedException { GridCacheVersion explicitVer = txEntry.conflictVersion(); + if (explicitVer == null) + explicitVer = writeVersion(); + if (txEntry.ttl() == CU.TTL_ZERO) op = DELETE; - boolean drNeedResolve = cacheCtx.conflictNeedResolve(); + boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); + + GridCacheVersionConflictContext conflictCtx = null; - if (drNeedResolve) { + if (conflictNeedResolve) { IgniteBiTuple> - drRes = conflictResolve(op, txEntry.key(), val, valBytes, - txEntry.ttl(), txEntry.conflictExpireTime(), explicitVer, cached); + drRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, cached); assert drRes != null; - GridCacheVersionConflictContext drCtx = drRes.get2(); + conflictCtx = drRes.get2(); - if (drCtx.isUseOld()) + if (conflictCtx.isUseOld()) op = NOOP; - else if (drCtx.isUseNew()) { - txEntry.ttl(drCtx.ttl()); - - if (drCtx.newEntry().dataCenterId() != cacheCtx.dataCenterId()) - txEntry.conflictExpireTime(drCtx.expireTime()); - else - txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE); + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } - else if (drCtx.isMerge()) { + else if (conflictCtx.isMerge()) { op = drRes.get1(); - val = drCtx.mergeValue(); + val = conflictCtx.mergeValue(); valBytes = null; explicitVer = writeVersion(); - txEntry.ttl(drCtx.ttl()); - txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE); + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } } else @@ -610,26 +610,28 @@ else if (op == READ) { "Transaction does not own lock for group lock entry during commit [tx=" + this + ", txEntry=" + txEntry + ']'; - if (txEntry.ttl() != -1L) - cached.updateTtl(null, txEntry.ttl()); + if (conflictCtx == null || !conflictCtx.isUseOld()) { + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) + cached.updateTtl(null, txEntry.ttl()); - if (nearCached != null) { - V val0 = null; - byte[] valBytes0 = null; + if (nearCached != null) { + V val0 = null; + byte[] valBytes0 = null; - GridCacheValueBytes valBytesTuple = cached.valueBytes(); + GridCacheValueBytes valBytesTuple = cached.valueBytes(); - if (!valBytesTuple.isNull()) { - if (valBytesTuple.isPlain()) - val0 = (V)valBytesTuple.get(); + if (!valBytesTuple.isNull()) { + if (valBytesTuple.isPlain()) + val0 = (V) valBytesTuple.get(); + else + valBytes0 = valBytesTuple.get(); + } else - valBytes0 = valBytesTuple.get(); - } - else - val0 = cached.rawGet(); + val0 = cached.rawGet(); - nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), - cached.ttl(), nodeId); + nearCached.updateOrEvict(xidVer, val0, valBytes0, cached.expireTime(), + cached.ttl(), nodeId); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c40839409065a..9f9af315fafd0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1118,8 +1118,6 @@ public void updateAllAsyncInternal0( dhtFut = createDhtFuture(ver, req, res, completionCb, false); - boolean replicate = ctx.isDrEnabled(); - expiry = expiryPolicy(req.expiry()); GridCacheReturn retVal = null; @@ -1127,7 +1125,7 @@ public void updateAllAsyncInternal0( if (keys.size() > 1 && // Several keys ... writeThrough() && // and store is enabled ... !ctx.store().isLocalStore() && // and this is not local store ... - !ctx.dr().receiveEnabled() // and no DR. + !ctx.dr().receiveEnabled() // and no DR. ) { // This method can only be used when there are no replicated entries in the batch. UpdateBatchResult updRes = updateWithBatch(node, @@ -1138,7 +1136,7 @@ public void updateAllAsyncInternal0( ver, dhtFut, completionCb, - replicate, + ctx.isDrEnabled(), taskName, expiry); @@ -1157,7 +1155,7 @@ public void updateAllAsyncInternal0( ver, dhtFut, completionCb, - replicate, + ctx.isDrEnabled(), taskName, expiry); @@ -1670,10 +1668,7 @@ private UpdateSingleResult updateSingle( long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); - assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; // Plain version is expected here. - - if (newConflictVer == null) - newConflictVer = ver; + assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), req.topologyVersion()); @@ -1723,15 +1718,12 @@ primary && writeThrough(), if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. - GridCacheVersionConflictContext ctx = updRes.conflictResolveResult(); + GridCacheVersionConflictContext conflictCtx = updRes.conflictResolveResult(); - long ttl = updRes.newTtl(); - long expireTime = updRes.conflictExpireTime(); - - if (ctx == null) + if (conflictCtx == null) newConflictVer = null; - else if (ctx.isMerge()) { - newConflictVer = null; // DR version is discarded in case of merge. + else if (conflictCtx.isMerge()) { + newConflictVer = null; // Conflict version is discarded in case of merge. newValBytes = null; // Value has been changed. } @@ -1746,7 +1738,7 @@ else if (ctx.isMerge()) { newValBytes, entryProcessor, updRes.newTtl(), - expireTime, + updRes.conflictExpireTime(), newConflictVer); } @@ -1756,8 +1748,8 @@ else if (ctx.isMerge()) { updRes.newValue(), newValBytes, entryProcessor, - ttl, - expireTime); + updRes.newTtl(), + updRes.conflictExpireTime()); } else { if (log.isDebugEnabled()) @@ -1771,9 +1763,6 @@ else if (ctx.isMerge()) { if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { GridCacheVersionConflictContext ctx = updRes.conflictResolveResult(); - long ttl = updRes.newTtl(); - long expireTime = updRes.conflictExpireTime(); - if (ctx != null && ctx.isMerge()) newValBytes = null; @@ -1782,11 +1771,11 @@ else if (ctx.isMerge()) { res.addNearValue(i, updRes.newValue(), newValBytes, - ttl, - expireTime); + updRes.newTtl(), + updRes.conflictExpireTime()); } else - res.addNearTtl(i, ttl, expireTime); + res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); if (updRes.newValue() != null || newValBytes != null) { IgniteInternalFuture f = entry.addReader(node.id(), req.messageId(), topVer); @@ -1880,7 +1869,7 @@ else if (F.contains(readers, node.id())) // Reader became primary or backup. ) { assert putMap == null ^ rmvKeys == null; - assert req.conflictVersions() == null : "updatePartialBatch cannot be called when there are DR entries in the batch."; + assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch."; long topVer = req.topologyVersion(); @@ -2480,9 +2469,6 @@ private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicUpdateReque long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); - if (ttl != -1L && expireTime == -1L) - expireTime = CU.toExpireTime(ttl); - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ef3de55bf8617..c3cc50a150820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -520,35 +520,39 @@ private void map0(GridDiscoveryTopologySnapshot topSnapshot, K key = F.first(keys); Object val; - long drTtl; - long drExpireTime; - GridCacheVersion drVer; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; if (vals != null) { + // Regular PUT. val = F.first(vals); - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } else if (conflictPutVals != null) { - GridCacheDrInfo drPutVal = F.first(conflictPutVals); + // Conflict PUT. + GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - val = drPutVal.value(); - drTtl = drPutVal.ttl(); - drExpireTime = drPutVal.expireTime(); - drVer = drPutVal.version(); + val = conflictPutVal.value(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); } else if (conflictRmvVals != null) { + // Conflict REMOVE. val = null; - drTtl = -1; - drExpireTime = -1; - drVer = F.first(conflictRmvVals); + conflictVer = F.first(conflictRmvVals); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } else { + // Regular REMOVE. val = null; - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } // We still can get here if user pass map with single element. @@ -599,7 +603,7 @@ else if (conflictRmvVals != null) { subjId, taskNameHash); - req.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, true); + req.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, true); single = true; @@ -614,15 +618,15 @@ else if (conflictRmvVals != null) { if (vals != null) it = vals.iterator(); - Iterator> drPutValsIt = null; + Iterator> conflictPutValsIt = null; if (conflictPutVals != null) - drPutValsIt = conflictPutVals.iterator(); + conflictPutValsIt = conflictPutVals.iterator(); - Iterator drRmvValsIt = null; + Iterator conflictRmvValsIt = null; if (conflictRmvVals != null) - drRmvValsIt = conflictRmvVals.iterator(); + conflictRmvValsIt = conflictRmvVals.iterator(); Map> pendingMappings = new HashMap<>(topNodes.size(), 1.0f); @@ -643,15 +647,15 @@ else if (conflictRmvVals != null) { } Object val; - long drTtl; - long drExpireTime; - GridCacheVersion drVer; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; if (vals != null) { val = it.next(); - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; if (val == null) { NullPointerException err = new NullPointerException("Null value."); @@ -662,24 +666,24 @@ else if (conflictRmvVals != null) { } } else if (conflictPutVals != null) { - GridCacheDrInfo drPutVal = drPutValsIt.next(); + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); - val = drPutVal.value(); - drTtl = drPutVal.ttl(); - drExpireTime = drPutVal.expireTime(); - drVer = drPutVal.version(); + val = conflictPutVal.value(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); } else if (conflictRmvVals != null) { val = null; - drTtl = -1; - drExpireTime = -1; - drVer = drRmvValsIt.next(); + conflictVer = conflictRmvValsIt.next(); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } else { val = null; - drTtl = -1; - drExpireTime = -1; - drVer = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } if (val == null && op != GridCacheOperation.DELETE) @@ -727,7 +731,7 @@ else if (conflictRmvVals != null) { "Invalid mapping state [old=" + old + ", remap=" + remap + ']'; } - mapped.addUpdateEntry(key, val, drTtl, drExpireTime, drVer, i == 0); + mapped.addUpdateEntry(key, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); i++; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 7fb962c1b1da4..22403ef1aac55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -317,9 +317,6 @@ public void processDhtAtomicUpdateRequest( long ttl = req.nearTtl(i); long expireTime = req.nearExpireTime(i); - if (ttl != -1L && expireTime == -1L) - expireTime = CU.toExpireTime(ttl); - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 93f4cf9915ead..abdb99c27bbf1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1287,21 +1287,65 @@ public String resolveTaskName() { * Resolve DR conflict. * * @param op Initially proposed operation. - * @param key Key. + * @param txEntry TX entry being updated. * @param newVal New value. * @param newValBytes New value bytes. - * @param newTtl New TTL. - * @param newDrExpireTime New explicit DR expire time. * @param newVer New version. * @param old Old entry. * @return Tuple with adjusted operation type and conflict context. * @throws IgniteCheckedException In case of eny exception. * @throws GridCacheEntryRemovedException If entry got removed. */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) protected IgniteBiTuple> conflictResolve( - GridCacheOperation op, K key, V newVal, byte[] newValBytes, long newTtl, long newDrExpireTime, - GridCacheVersion newVer, GridCacheEntryEx old) + GridCacheOperation op, IgniteTxEntry txEntry, V newVal, byte[] newValBytes, GridCacheVersion newVer, + GridCacheEntryEx old) throws IgniteCheckedException, GridCacheEntryRemovedException { + assert newVer != null; + + // 1. Calculate TTL and expire time. + long newTtl = txEntry.ttl(); + long newExpireTime = txEntry.conflictExpireTime(); + + // 1.1. If TTL is not changed, then calculate it based on expiry. + if (newTtl == CU.TTL_NOT_CHANGED) { + ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry); + + if (expiry != null) { + if (op == CREATE) + newTtl = CU.toTtl(expiry.getExpiryForCreation()); + else if (op == UPDATE) + newTtl = CU.toTtl(expiry.getExpiryForUpdate()); + } + } + + // 1.2. If TTL is set to zero, then mark operation as "DELETE". + if (newTtl == CU.TTL_ZERO) { + op = DELETE; + + newTtl = CU.TTL_ETERNAL; + } + + // 1.3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL". + if (newTtl == CU.TTL_NOT_CHANGED) { + if (old.isNewLocked()) + newTtl = CU.TTL_ETERNAL; + else { + newTtl = old.rawTtl(); + newExpireTime = old.rawExpireTime(); + } + } + + // TTL must be resolved at this point. + assert newTtl != CU.TTL_ZERO && newTtl != CU.TTL_NOT_CHANGED; + + // 1.4 If expire time was not set explicitly, then calculate it. + if (newExpireTime == CU.EXPIRE_TIME_CALCULATE) + newExpireTime = CU.toExpireTime(newTtl); + + // Expire time must be resolved at this point. + assert newExpireTime != CU.EXPIRE_TIME_CALCULATE; + // Construct old entry info. GridCacheVersionedEntryEx oldEntry = old.versionedEntry(); @@ -1309,10 +1353,8 @@ protected IgniteBiTuple= 0L ? newDrExpireTime : CU.toExpireTime(newTtl); - GridCacheVersionedEntryEx newEntry = - new GridCachePlainVersionedEntry<>(key, newVal, newTtl, newExpireTime, newVer); + new GridCachePlainVersionedEntry<>((K)txEntry.key(), newVal, newTtl, newExpireTime, newVer); GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 1416d8c7de8df..0f1366b5fc1e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -852,10 +852,17 @@ public void expiry(@Nullable ExpiryPolicy expiryPlc) { val.writeTo(out); out.writeLong(ttl); - out.writeLong(conflictExpireTime); CU.writeVersion(out, explicitVer); out.writeBoolean(grpLock); + + if (conflictExpireTime != CU.EXPIRE_TIME_CALCULATE) { + out.writeBoolean(true); + out.writeLong(conflictExpireTime); + } + else + out.writeBoolean(false); + CU.writeVersion(out, conflictVer); out.writeObject(transferExpiryPlc ? new IgniteExternalizableExpiryPolicy(expiryPlc) : null); @@ -882,10 +889,11 @@ public void expiry(@Nullable ExpiryPolicy expiryPlc) { val.readFrom(in); ttl = in.readLong(); - conflictExpireTime = in.readLong(); explicitVer = CU.readVersion(in); grpLock = in.readBoolean(); + + conflictExpireTime = in.readBoolean() ? in.readLong() : CU.EXPIRE_TIME_CALCULATE; conflictVer = CU.readVersion(in); expiryPlc = (ExpiryPolicy)in.readObject(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 2da1bb7c6bc9c..e3f777dffe7e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -741,7 +741,7 @@ else if (cacheCtx.isNear() && txEntry.locallyMapped()) V val = res.get2(); byte[] valBytes = res.get3(); - // Deal with DR conflicts. + // Deal with conflicts. GridCacheVersion explicitVer = txEntry.conflictVersion() != null ? txEntry.conflictVersion() : writeVersion(); @@ -762,45 +762,43 @@ else if (cacheCtx.isNear() && txEntry.locallyMapped()) } } - boolean drNeedResolve = cacheCtx.conflictNeedResolve(); + boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); - if (drNeedResolve) { + GridCacheVersionConflictContext conflictCtx = null; + + if (conflictNeedResolve) { IgniteBiTuple> - drRes = conflictResolve(op, txEntry.key(), val, valBytes, txEntry.ttl(), - txEntry.conflictExpireTime(), explicitVer, cached); + conflictRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, + cached); - assert drRes != null; + assert conflictRes != null; - GridCacheVersionConflictContext conflictCtx = drRes.get2(); + conflictCtx = conflictRes.get2(); if (conflictCtx.isUseOld()) op = NOOP; else if (conflictCtx.isUseNew()) { txEntry.ttl(conflictCtx.ttl()); - - if (conflictCtx.newEntry().dataCenterId() != cctx.dataCenterId()) - txEntry.conflictExpireTime(conflictCtx.expireTime()); - else - txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } else { assert conflictCtx.isMerge(); - op = drRes.get1(); + op = conflictRes.get1(); val = conflictCtx.mergeValue(); valBytes = null; explicitVer = writeVersion(); txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. explicitVer = null; - if (sndTransformedVals || drNeedResolve) { - assert sndTransformedVals && cacheCtx.isReplicated() || drNeedResolve; + if (sndTransformedVals || conflictNeedResolve) { + assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve; txEntry.value(val, true, false); txEntry.valueBytes(valBytes); @@ -905,8 +903,10 @@ else if (op == READ) { "Transaction does not own lock for group lock entry during commit [tx=" + this + ", txEntry=" + txEntry + ']'; - if (txEntry.ttl() != CU.TTL_NOT_CHANGED) - cached.updateTtl(null, txEntry.ttl()); + if (conflictCtx == null || !conflictCtx.isUseOld()) { + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) + cached.updateTtl(null, txEntry.ttl()); + } if (log.isDebugEnabled()) log.debug("Ignoring NOOP entry when committing: " + txEntry); @@ -3257,8 +3257,10 @@ void entryExpiry(IgniteTxKey key, @Nullable ExpiryPolicy expiryPlc) { IgniteTxEntry e = entry(key); - if (e != null) + if (e != null) { e.expiry(expiryPlc); + e.conflictExpireTime(CU.EXPIRE_TIME_CALCULATE); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java index 5e20ba7804b5a..8c37f5a5c85d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersion.java @@ -21,7 +21,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; import java.io.*; import java.nio.*; @@ -150,7 +149,7 @@ public byte dataCenterId() { /** * @return Conflict version. */ - @Nullable public GridCacheVersion conflictVersion() { + public GridCacheVersion conflictVersion() { return this; // Use current version. } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java index b81380349205c..94a611eb8f203 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionConflictContext.java @@ -43,9 +43,6 @@ public class GridCacheVersionConflictContext { /** TTL. */ private long ttl; - /** Explicit TTL flag. */ - private boolean explicitTtl; - /** Manual resolve flag. */ private boolean manualResolve; @@ -94,24 +91,11 @@ public void useOld() { /** * Force cache to apply new entry overwriting old (existing) entry. - *

- * Note that updates from remote data centers always have explicit TTL , while local data center - * updates will only have explicit TTL in case {@link CacheEntry#timeToLive(long)} was called - * before update. In the latter case new entry will pick TTL of the old (existing) entry, even - * if it was set through update from remote data center. it means that depending on concurrent - * update timings new update might pick unexpected TTL. For example, consider that three updates - * of the same key are performed: local update with explicit TTL (1) followed by another local - * update without explicit TTL (2) and one remote update (3). In this case you might expect that - * update (2) will pick TTL set during update (1). However, in case update (3) occurrs between (1) - * and (2) and it overwrites (1) during conflict resolution, then update (2) will pick TTL of - * update (3). To have predictable TTL in such cases you should either always set it explicitly - * through {@code GridCacheEntry.timeToLive(long)} or use {@link #merge(Object, long)}. */ public void useNew() { state = State.USE_NEW; - if (!explicitTtl) - ttl = newEntry.ttl(); + ttl = newEntry.ttl(); } /** @@ -121,15 +105,16 @@ public void useNew() { * Also in case of merge you have to specify new TTL explicitly. For unlimited TTL use {@code 0}. * * @param mergeVal Merge value or {@code null} to force remove. - * @param ttl Time to live in milliseconds. + * @param ttl Time to live in milliseconds (must be non-negative). */ public void merge(@Nullable V mergeVal, long ttl) { + if (ttl < 0) + throw new IllegalArgumentException("TTL must be non-negative: " + ttl); + state = State.MERGE; this.mergeVal = mergeVal; this.ttl = ttl; - - explicitTtl = true; } /** @@ -185,15 +170,7 @@ public long ttl() { * @return Expire time. */ public long expireTime() { - return explicitTtl ? CU.toExpireTime(ttl) : isUseNew() ? newEntry.expireTime() : - isUseOld() ? oldEntry.expireTime() : 0L; - } - - /** - * @return Explicit TTL flag. - */ - public boolean explicitTtl() { - return explicitTtl; + return isUseNew() ? newEntry.expireTime() : isUseOld() ? oldEntry.expireTime() : CU.toExpireTime(ttl); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index 4f135caac386c..9a6cbd29762bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -77,7 +77,7 @@ public GridCacheVersionEx(int topVer, int nodeOrderDrId, long globalTime, long o } /** {@inheritDoc} */ - @Override public GridCacheVersion conflictVersion() { + public GridCacheVersion conflictVersion() { return drVer; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java index ed43149e3f87e..b8cfe771c527b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -61,7 +61,11 @@ public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Update K key = entry.key(); - GridCacheDrInfo val = entry.value() != null ? entry.expireTime() != 0 ? + // Ensure that updater to not receive special-purpose values for TTL and expire time. + assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; + assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; + + GridCacheDrInfo val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : new GridCacheDrInfo<>(entry.value(), entry.version()) : null;