diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 5b606c40d5ede..f7f0fc28ba23e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -559,6 +559,16 @@ public GridIoMessageFactory(MessageFactory[] ext) { break; + case 103: + msg = new GridCacheRawVersionedEntry<>(); + + break; + + case 104: + msg = new GridCacheVersionEx(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index b231d89802eec..686f7b0b91817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -32,18 +32,6 @@ public interface CacheObject extends Message { */ @Nullable public T value(CacheObjectContext ctx, boolean cpy); - /** - * @param name Field name. - * @return Field value. - */ - @Nullable public T getField(String name); - - /** - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException; - /** * @return {@code True} if value is byte array. */ @@ -56,6 +44,14 @@ public interface CacheObject extends Message { */ public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; + /** + * Prepares cache object for cache (e.g. copies user-provided object if needed). + * + * @param ctx Cache context. + * @return Instance to store in cache. + */ + public CacheObject prepareForCache(CacheObjectContext ctx); + /** * @param ctx Context. * @param ldr Class loader. @@ -64,9 +60,8 @@ public interface CacheObject extends Message { public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException; /** - * @param ctx Cache context. - * - * @return Instance to store in cache. + * @param ctx Context. + * @throws IgniteCheckedException If failed. */ - public CacheObject prepareForCache(CacheObjectContext ctx); + public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 4d8572a52e301..0e63506a34f3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -36,12 +36,6 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable /** */ protected byte[] valBytes; - /** {@inheritDoc} */ - @Nullable @Override public T getField(String name) { - // TODO IGNITE-51. - return null; - } - /** * @param ctx Context. * @return {@code True} need to copy value returned to user. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 44890f7a6e315..8037961ae0bba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2570,7 +2570,7 @@ public IgniteInternalFuture> getAllAsync0(@Nullable final C } /** {@inheritDoc} */ - @Override public void putAllConflict(final Map> drMap) + @Override public void putAllConflict(final Map drMap) throws IgniteCheckedException { if (F.isEmpty(drMap)) return; @@ -2591,7 +2591,7 @@ public IgniteInternalFuture> getAllAsync0(@Nullable final C } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllConflictAsync(final Map> drMap) + @Override public IgniteInternalFuture putAllConflictAsync(final Map drMap) throws IgniteCheckedException { if (F.isEmpty(drMap)) return new GridFinishedFuture(ctx.kernalContext()); @@ -3483,7 +3483,7 @@ public IgniteInternalFuture> getAllAsync0(@Nullable final C } /** {@inheritDoc} */ - @Override public void removeAllConflict(final Map drMap) + @Override public void removeAllConflict(final Map drMap) throws IgniteCheckedException { ctx.denyOnLocalRead(); @@ -3504,7 +3504,7 @@ public IgniteInternalFuture> getAllAsync0(@Nullable final C } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllConflictAsync(final Map drMap) + @Override public IgniteInternalFuture removeAllConflictAsync(final Map drMap) throws IgniteCheckedException { ctx.denyOnLocalRead(); @@ -3977,10 +3977,10 @@ protected void checkJta() throws IgniteCheckedException { final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); if (ctx.store().isLocalStore()) { - IgniteDataLoaderImpl ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex()); + IgniteDataLoaderImpl ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex()); try { - ldr.updater(new GridDrDataLoadCacheUpdater()); + ldr.updater(new GridDrDataLoadCacheUpdater()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); @@ -4193,10 +4193,10 @@ public void localLoad(Collection keys, }); if (ctx.store().isLocalStore()) { - IgniteDataLoaderImpl ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex()); + IgniteDataLoaderImpl ldr = ctx.kernalContext().dataLoad().dataLoader(ctx.namex()); try { - ldr.updater(new GridDrDataLoadCacheUpdater()); + ldr.updater(new GridDrDataLoadCacheUpdater()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); @@ -5760,7 +5760,7 @@ protected AsyncInOp(K key) { /** * @param keys Keys involved. */ - protected AsyncInOp(Collection keys) { + protected AsyncInOp(Collection keys) { super(keys); } @@ -6229,7 +6229,7 @@ private class LocalStoreLoadClosure extends CIX3 p; /** */ - final Collection> col; + final Collection col; /** */ final IgniteDataLoaderImpl ldr; @@ -6272,20 +6272,18 @@ else if (ttl == CU.TTL_NOT_CHANGED) ttl = 0; } - GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry<>(ctx.toCacheKeyObject(key), - null, + GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry(ctx.toCacheKeyObject(key), ctx.toCacheObject(val), - null, ttl, 0, ver); - e.marshal(ctx.marshaller()); + e.prepareDirectMarshal(ctx.cacheObjectContext()); col.add(e); if (col.size() == ldr.perNodeBufferSize()) { - ldr.addData(col); + ldr.addDataInternal(col); col.clear(); } @@ -6296,7 +6294,7 @@ else if (ttl == CU.TTL_NOT_CHANGED) */ void onDone() { if (!col.isEmpty()) - ldr.addData(col); + ldr.addDataInternal(col); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 0a386eb8b3223..78c08004ed8dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -124,19 +124,19 @@ public class GridCacheContext implements Externalizable { private CacheDataStructuresManager dataStructuresMgr; /** Eager TTL manager. */ - private GridCacheTtlManager ttlMgr; + private GridCacheTtlManager ttlMgr; /** Store manager. */ private GridCacheStoreManager storeMgr; /** Replication manager. */ - private GridCacheDrManager drMgr; + private GridCacheDrManager drMgr; /** Serialization manager. */ private IgniteCacheSerializationManager serMgr; /** JTA manager. */ - private CacheJtaManagerAdapter jtaMgr; + private CacheJtaManagerAdapter jtaMgr; /** Managers. */ private List> mgrs = new LinkedList<>(); @@ -238,9 +238,9 @@ public GridCacheContext( CacheContinuousQueryManager contQryMgr, GridCacheAffinityManager affMgr, CacheDataStructuresManager dataStructuresMgr, - GridCacheTtlManager ttlMgr, - GridCacheDrManager drMgr, - CacheJtaManagerAdapter jtaMgr) { + GridCacheTtlManager ttlMgr, + GridCacheDrManager drMgr, + CacheJtaManagerAdapter jtaMgr) { assert ctx != null; assert sharedCtx != null; assert cacheCfg != null; @@ -839,7 +839,7 @@ public IgniteTxManager tm() { /** * @return Lock order manager. */ - public GridCacheVersionManager versions() { + public GridCacheVersionManager versions() { return sharedCtx.versions(); } @@ -930,21 +930,21 @@ public CacheDataStructuresManager dataStructures() { /** * @return DR manager. */ - public GridCacheDrManager dr() { + public GridCacheDrManager dr() { return drMgr; } /** * @return TTL manager. */ - public GridCacheTtlManager ttl() { + public GridCacheTtlManager ttl() { return ttlMgr; } /** * @return JTA manager. */ - public CacheJtaManagerAdapter jta() { + public CacheJtaManagerAdapter jta() { return jtaMgr; } @@ -958,7 +958,7 @@ public boolean putIfAbsentFilter(@Nullable CacheEntryPredicate[] p) { for (CacheEntryPredicate p0 : p) { if ((p0 instanceof CacheEntrySerializablePredicate) && - ((CacheEntrySerializablePredicate) p0).predicate() instanceof CacheEntryPredicateNoValue) + ((CacheEntrySerializablePredicate)p0).predicate() instanceof CacheEntryPredicateNoValue) return true; } @@ -1589,15 +1589,15 @@ public boolean conflictNeedResolve() { * * @param oldEntry Old entry. * @param newEntry New entry. - * @param atomicVerComparator Whether to use atomic version comparator. + * @param atomicVerComp Whether to use atomic version comparator. * @return Conflict resolution result. * @throws IgniteCheckedException In case of exception. */ public GridCacheVersionConflictContext conflictResolve(GridCacheVersionedEntryEx oldEntry, - GridCacheVersionedEntryEx newEntry, boolean atomicVerComparator) throws IgniteCheckedException { + GridCacheVersionedEntryEx newEntry, boolean atomicVerComp) throws IgniteCheckedException { assert conflictRslvr != null : "Should not reach this place."; - GridCacheVersionConflictContext ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComparator); + GridCacheVersionConflictContext ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComp); if (ctx.isManualResolve()) drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 0caf1f9b23e36..a1c7ecf0a8771 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -47,6 +47,7 @@ import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.processors.cache.CacheFlag.*; +import static org.apache.ignite.internal.processors.dr.GridDrType.*; import static org.apache.ignite.transactions.TransactionState.*; /** @@ -142,8 +143,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @param ttl Time to live. * @param hdrId Header id. */ - protected GridCacheMapEntry(GridCacheContext cctx, KeyCacheObject key, int hash, CacheObject val, - GridCacheMapEntry next, long ttl, int hdrId) { + protected GridCacheMapEntry(GridCacheContext cctx, + KeyCacheObject key, + int hash, + CacheObject val, + GridCacheMapEntry next, + long ttl, + int hdrId) + { log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); key = (KeyCacheObject)cctx.kernalContext().portable().prepareForCache(key, cctx); @@ -1654,79 +1661,83 @@ else if (ttl != CU.TTL_ZERO) // Cache is conflict-enabled. if (cctx.conflictNeedResolve()) { -// TODO IGNITE-51. -// // Get new value, optionally unmarshalling and/or transforming it. -// if (writeObj == null && valBytes != null) -// writeObj = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader()); -// -// if (op == GridCacheOperation.TRANSFORM) { -// transformClo = writeObj; -// -// writeObj = ((IgniteClosure)writeObj).apply(rawGetOrUnmarshalUnlocked(true)); -// valBytes = null; -// } -// -// GridTuple3 expiration = ttlAndExpireTime(expiryPlc, explicitTtl, -// explicitExpireTime); -// -// // Prepare old and new entries for conflict resolution. -// GridCacheVersionedEntryEx oldEntry = versionedEntry(); -// GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(key, (V)writeObj, -// expiration.get1(), expiration.get2(), conflictVer != null ? conflictVer : newVer); -// -// // Resolve conflict. -// conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); -// -// assert conflictCtx != null; -// -// // Use old value? -// if (conflictCtx.isUseOld()) { -// GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; -// -// // Handle special case with atomic comparator. -// if (!isNew() && // Not initial value, -// verCheck && // and atomic version check, -// oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, -// ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal, -// cctx.writeThrough() && // and store is enabled, -// primary) // and we are primary. -// { -// V val = rawGetOrUnmarshalUnlocked(false); -// -// if (val == null) { -// assert deletedUnlocked(); -// -// cctx.store().removeFromStore(null, key()); -// } -// else -// cctx.store().putToStore(null, key(), val, ver); -// } -// -// return new GridCacheUpdateAtomicResult<>(false, -// retval ? rawGetOrUnmarshalUnlocked(false) : null, -// null, -// invokeRes, -// CU.TTL_ETERNAL, -// CU.EXPIRE_TIME_ETERNAL, -// null, -// null, -// false); -// } -// // Will update something. -// else { -// // Merge is a local update which override passed value bytes. -// if (conflictCtx.isMerge()) { -// writeObj = conflictCtx.mergeValue(); -// valBytes = null; -// -// conflictVer = null; -// } -// else -// assert conflictCtx.isUseNew(); -// -// // Update value is known at this point, so update operation type. -// op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; -// } + // Get new value, optionally unmarshalling and/or transforming it. + Object writeObj0; + + if (op == GridCacheOperation.TRANSFORM) { + transformClo = writeObj; + + // TODO IGNITE-51 + writeObj0 = ((IgniteClosure)writeObj).apply(rawGetOrUnmarshalUnlocked(true)); + } + else + writeObj0 = CU.value((CacheObject)writeObj, cctx, false); + + GridTuple3 expiration = ttlAndExpireTime(expiryPlc, + explicitTtl, + explicitExpireTime); + + // Prepare old and new entries for conflict resolution. + GridCacheVersionedEntryEx oldEntry = versionedEntry(); + GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>( + oldEntry.key(), + writeObj0, + expiration.get1(), + expiration.get2(), + conflictVer != null ? conflictVer : newVer); + + // Resolve conflict. + conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck); + + assert conflictCtx != null; + + // Use old value? + if (conflictCtx.isUseOld()) { + GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer; + + // Handle special case with atomic comparator. + if (!isNew() && // Not initial value, + verCheck && // and atomic version check, + oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal, + ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer) == 0 && // and both versions are equal, + cctx.writeThrough() && // and store is enabled, + primary) // and we are primary. + { + CacheObject val = rawGetOrUnmarshalUnlocked(false); + + if (val == null) { + assert deletedUnlocked(); + + cctx.store().removeFromStore(null, key()); + } + else + cctx.store().putToStore(null, key(), val, ver); + } + + return new GridCacheUpdateAtomicResult(false, + retval ? rawGetOrUnmarshalUnlocked(false) : null, + null, + invokeRes, + CU.TTL_ETERNAL, + CU.EXPIRE_TIME_ETERNAL, + null, + null, + false); + } + // Will update something. + else { + // Merge is a local update which override passed value bytes. + if (conflictCtx.isMerge()) { + writeObj = cctx.toCacheObject(conflictCtx.mergeValue()); + + conflictVer = null; + } + else + assert conflictCtx.isUseNew(); + + // Update value is known at this point, so update operation type. + op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; + } } else // Nullify conflict version on this update, so that we will use regular version during next updates. @@ -1915,7 +1926,6 @@ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer) <= 0 : newVer.dataCenterId(), conflictVer); - if (op == GridCacheOperation.UPDATE) { // Conflict context is null if there were no explicit conflict resolution. if (conflictCtx == null) { @@ -2274,9 +2284,8 @@ private GridTuple3 ttlAndExpireTime(IgniteCacheExpiryPolicy */ private void drReplicate(GridDrType drType, @Nullable CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { -// TODO IGNITE-51. -// if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) -// cctx.dr().replicate(key, null, val, valBytes, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType); + if (cctx.isDrEnabled() && drType != DR_NONE && !isInternal()) + cctx.dr().replicate(key, val, rawTtl(), rawExpireTime(), ver.conflictVersion(), drType); } /** @@ -3292,8 +3301,14 @@ else if (deletedUnlocked()) @Override public synchronized GridCacheVersionedEntryEx versionedEntry() throws IgniteCheckedException { boolean isNew = isStartVersion(); - return new GridCachePlainVersionedEntry<>(key, isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false), - ttlExtras(), expireTimeExtras(), ver.conflictVersion(), isNew); + CacheObject val = isNew ? unswap(true, true) : rawGetOrUnmarshalUnlocked(false); + + return new GridCachePlainVersionedEntry<>(key.value(cctx.cacheObjectContext(), true), + CU.value(val, cctx, true), + ttlExtras(), + expireTimeExtras(), + ver.conflictVersion(), + isNew); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java index 08a3fb1332b74..a4175255a2c4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionEx.java @@ -112,7 +112,7 @@ public IgniteInternalFuture putxAsync(K key, V val, @Nullable GridCache * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public void putAllConflict(Map> drMap) throws IgniteCheckedException; + public void putAllConflict(Map drMap) throws IgniteCheckedException; /** * Store DR data asynchronously. @@ -122,7 +122,8 @@ public IgniteInternalFuture putxAsync(K key, V val, @Nullable GridCache * @throws IgniteCheckedException If put operation failed. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture putAllConflictAsync(Map> drMap) throws IgniteCheckedException; + public IgniteInternalFuture putAllConflictAsync(Map drMap) + throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. @@ -154,7 +155,7 @@ public IgniteInternalFuture removeAsync(K key, @Nullable GridCacheEntryEx ent * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public void removeAllConflict(Map drMap) throws IgniteCheckedException; + public void removeAllConflict(Map drMap) throws IgniteCheckedException; /** * Removes DR data asynchronously. @@ -164,7 +165,7 @@ public IgniteInternalFuture removeAsync(K key, @Nullable GridCacheEntryEx ent * @throws IgniteCheckedException If remove failed. * @throws CacheFlagException If projection flags validation failed. */ - public IgniteInternalFuture removeAllConflictAsync(Map drMap) throws IgniteCheckedException; + public IgniteInternalFuture removeAllConflictAsync(Map drMap) throws IgniteCheckedException; /** * Internal method that is called from {@link GridCacheEntryImpl}. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 7a27cff74f38f..19aa08e6174f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -620,12 +620,12 @@ boolean isAll(K k, V v) { } /** {@inheritDoc} */ - @Override public void putAllConflict(Map> drMap) throws IgniteCheckedException { + @Override public void putAllConflict(Map drMap) throws IgniteCheckedException { cache.putAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllConflictAsync(Map> drMap) + @Override public IgniteInternalFuture putAllConflictAsync(Map drMap) throws IgniteCheckedException { return cache.putAllConflictAsync(drMap); } @@ -962,12 +962,13 @@ boolean isAll(K k, V v) { } /** {@inheritDoc} */ - @Override public void removeAllConflict(Map drMap) throws IgniteCheckedException { + @Override public void removeAllConflict(Map drMap) throws IgniteCheckedException { cache.removeAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) throws IgniteCheckedException { + @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) + throws IgniteCheckedException { return cache.removeAllConflictAsync(drMap); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 88200bb5a01ac..88af3ddedee0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -709,7 +709,7 @@ public GridCacheProjectionImpl gateProjection() { } /** {@inheritDoc} */ - @Override public void putAllConflict(Map> drMap) throws IgniteCheckedException { + @Override public void putAllConflict(Map drMap) throws IgniteCheckedException { GridCacheProjectionImpl prev = gate.enter(prj); try { @@ -721,7 +721,7 @@ public GridCacheProjectionImpl gateProjection() { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllConflictAsync(Map> drMap) + @Override public IgniteInternalFuture putAllConflictAsync(Map drMap) throws IgniteCheckedException { GridCacheProjectionImpl prev = gate.enter(prj); @@ -1425,7 +1425,8 @@ public GridCacheProjectionImpl gateProjection() { } /** {@inheritDoc} */ - @Override public void removeAllConflict(Map drMap) throws IgniteCheckedException { + @Override public void removeAllConflict(Map drMap) + throws IgniteCheckedException { GridCacheProjectionImpl prev = gate.enter(prj); try { @@ -1437,7 +1438,9 @@ public GridCacheProjectionImpl gateProjection() { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) throws IgniteCheckedException { + @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) + throws IgniteCheckedException + { GridCacheProjectionImpl prev = gate.enter(prj); try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 44775aea871bf..99e1285016968 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -56,7 +56,7 @@ public class GridCacheSharedContext { private GridCachePartitionExchangeManager exchMgr; /** Version manager. */ - private GridCacheVersionManager verMgr; + private GridCacheVersionManager verMgr; /** Lock manager. */ private GridCacheMvccManager mvccMgr; @@ -84,7 +84,7 @@ public class GridCacheSharedContext { public GridCacheSharedContext( GridKernalContext kernalCtx, IgniteTxManager txMgr, - GridCacheVersionManager verMgr, + GridCacheVersionManager verMgr, GridCacheMvccManager mvccMgr, GridCacheDeploymentManager depMgr, GridCachePartitionExchangeManager exchMgr, @@ -254,7 +254,7 @@ public GridCachePartitionExchangeManager exchange() { /** * @return Lock order manager. */ - public GridCacheVersionManager versions() { + public GridCacheVersionManager versions() { return verMgr; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 335ac632ca1f5..7bfdbfa09976f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -32,9 +32,9 @@ * Eagerly removes expired entries from cache when {@link org.apache.ignite.configuration.CacheConfiguration#isEagerTtl()} flag is set. */ @SuppressWarnings("NakedNotify") -public class GridCacheTtlManager extends GridCacheManagerAdapter { +public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private final GridConcurrentSkipListSet> pendingEntries = new GridConcurrentSkipListSet<>(); + private final GridConcurrentSkipListSet pendingEntries = new GridConcurrentSkipListSet<>(); /** Cleanup worker thread. */ private CleanupWorker cleanupWorker; @@ -68,7 +68,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { * @param entry Entry to add. */ public void addTrackedEntry(GridCacheMapEntry entry) { - EntryWrapper wrapper = new EntryWrapper<>(entry); + EntryWrapper wrapper = new EntryWrapper(entry); pendingEntries.add(wrapper); @@ -86,7 +86,7 @@ public void addTrackedEntry(GridCacheMapEntry entry) { public void removeTrackedEntry(GridCacheMapEntry entry) { // Remove must be called while holding lock on entry before updating expire time. // No need to wake up waiting thread in this case. - pendingEntries.remove(new EntryWrapper<>(entry)); + pendingEntries.remove(new EntryWrapper(entry)); } /** {@inheritDoc} */ @@ -114,8 +114,8 @@ protected CleanupWorker() { GridCacheVersion obsoleteVer = null; - for (Iterator> it = pendingEntries.iterator(); it.hasNext(); ) { - EntryWrapper wrapper = it.next(); + for (Iterator it = pendingEntries.iterator(); it.hasNext(); ) { + EntryWrapper wrapper = it.next(); if (wrapper.expireTime <= now) { if (log.isDebugEnabled()) @@ -142,7 +142,7 @@ protected CleanupWorker() { // synchronization block, so we don't miss out // on thread notification events sent from // 'addTrackedEntry(..)' method. - EntryWrapper first = pendingEntries.firstx(); + EntryWrapper first = pendingEntries.firstx(); if (first != null) { long waitTime = first.expireTime - U.currentTimeMillis(); @@ -163,7 +163,7 @@ protected CleanupWorker() { /** * Entry wrapper. */ - private static class EntryWrapper implements Comparable> { + private static class EntryWrapper implements Comparable { /** Entry expire time. */ private final long expireTime; @@ -182,7 +182,7 @@ private EntryWrapper(GridCacheMapEntry entry) { } /** {@inheritDoc} */ - @Override public int compareTo(EntryWrapper o) { + @Override public int compareTo(EntryWrapper o) { if (expireTime == o.expireTime) { if (entry.startVersion() == o.entry.startVersion()) return 0; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java index b243e6233e8d0..7122f6668cb14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java @@ -49,11 +49,6 @@ public KeyCacheObjectTransferImpl(byte[] valBytes) { throw new IllegalStateException(); } - /** {@inheritDoc} */ - @Nullable @Override public T getField(String name) { - throw new IllegalStateException(); - } - /** {@inheritDoc} */ @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { // No-op. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index a867c2a06e27f..942545784a6ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -519,29 +519,27 @@ private void commitIfLocked() throws IgniteCheckedException { GridCacheVersionConflictContext conflictCtx = null; if (conflictNeedResolve) { -// TODO IGNITE-51. -// IgniteBiTuple -// drRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, cached); -// -// assert drRes != null; -// -// conflictCtx = drRes.get2(); -// -// if (conflictCtx.isUseOld()) -// op = NOOP; -// else if (conflictCtx.isUseNew()) { -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } -// else if (conflictCtx.isMerge()) { -// op = drRes.get1(); -// val = conflictCtx.mergeValue(); -// valBytes = null; -// explicitVer = writeVersion(); -// -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } + IgniteBiTuple + drRes = conflictResolve(op, txEntry, val, explicitVer, cached); + + assert drRes != null; + + conflictCtx = drRes.get2(); + + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + else if (conflictCtx.isMerge()) { + op = drRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); + + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ef5e7cc9a44c7..9f42e91aa7fc3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -434,7 +434,7 @@ public void near(GridNearAtomicCache near) { @Override public IgniteInternalFuture> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); - return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsValArray(val)); + return removeAllAsync0(F.asList(key), null, true, true, ctx.equalsValArray(val)); } /** {@inheritDoc} */ @@ -472,13 +472,13 @@ public void near(GridNearAtomicCache near) { } /** {@inheritDoc} */ - @Override public void putAllConflict(Map> conflictMap) + @Override public void putAllConflict(Map conflictMap) throws IgniteCheckedException { putAllConflictAsync(conflictMap).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllConflictAsync(Map> conflictMap) { + @Override public IgniteInternalFuture putAllConflictAsync(Map conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); return updateAllAsync0(null, @@ -504,7 +504,7 @@ public void near(GridNearAtomicCache near) { @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); - return removeAllAsync0(Collections.singletonList(key), null, entry, true, false, filter); + return removeAllAsync0(Collections.singletonList(key), null, true, false, filter); } /** {@inheritDoc} */ @@ -518,7 +518,7 @@ public void near(GridNearAtomicCache near) { CacheEntryPredicate[] filter) { A.notNull(keys, "keys"); - return removeAllAsync0(keys, null, null, false, false, filter); + return removeAllAsync0(keys, null, false, false, filter); } /** {@inheritDoc} */ @@ -533,7 +533,7 @@ public void near(GridNearAtomicCache near) { @Nullable CacheEntryPredicate... filter) { A.notNull(key, "key"); - return removeAllAsync0(Collections.singletonList(key), null, entry, false, false, filter); + return removeAllAsync0(Collections.singletonList(key), null, false, false, filter); } /** {@inheritDoc} */ @@ -559,16 +559,16 @@ public void near(GridNearAtomicCache near) { } /** {@inheritDoc} */ - @Override public void removeAllConflict(Map conflictMap) + @Override public void removeAllConflict(Map conflictMap) throws IgniteCheckedException { removeAllConflictAsync(conflictMap).get(); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllConflictAsync(Map conflictMap) { + @Override public IgniteInternalFuture removeAllConflictAsync(Map conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - return removeAllAsync0(null, conflictMap, null, false, false, null); + return removeAllAsync0(null, conflictMap, false, false, null); } /** @@ -769,8 +769,8 @@ private IgniteInternalFuture updateAllAsync0( @Nullable final Map map, @Nullable final Map invokeMap, @Nullable Object[] invokeArgs, - @Nullable final Map> conflictPutMap, - @Nullable final Map conflictRmvMap, + @Nullable final Map conflictPutMap, + @Nullable final Map conflictRmvMap, final boolean retval, final boolean rawRetval, @Nullable GridCacheEntryEx cached, @@ -819,7 +819,6 @@ private IgniteInternalFuture updateAllAsync0( * * @param keys Keys to remove. * @param conflictMap Conflict map. - * @param cached Cached cache entry for key. May be passed if and only if keys size is {@code 1}. * @param retval Return value required flag. * @param rawRetval Return {@code GridCacheReturn} instance. * @param filter Cache entry filter for atomic removes. @@ -827,8 +826,7 @@ private IgniteInternalFuture updateAllAsync0( */ private IgniteInternalFuture removeAllAsync0( @Nullable final Collection keys, - @Nullable final Map conflictMap, - @Nullable GridCacheEntryEx cached, + @Nullable final Map conflictMap, final boolean retval, boolean rawRetval, @Nullable final CacheEntryPredicate[] filter @@ -2249,7 +2247,7 @@ private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { log.debug("Remapping near update request locally: " + req); Collection vals; - Collection> drPutVals; + Collection drPutVals; Collection drRmvVals; if (req.conflictVersions() == null) { @@ -2267,9 +2265,9 @@ else if (req.operation() == UPDATE) { long ttl = req.conflictTtl(i); if (ttl == CU.TTL_NOT_CHANGED) - drPutVals.add(new GridCacheDrInfo<>(req.value(i), req.conflictVersion(i))); + drPutVals.add(new GridCacheDrInfo(req.value(i), req.conflictVersion(i))); else - drPutVals.add(new GridCacheDrExpirationInfo<>(req.value(i), req.conflictVersion(i), ttl, + drPutVals.add(new GridCacheDrExpirationInfo(req.value(i), req.conflictVersion(i), ttl, req.conflictExpireTime(i))); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 45e94552c05e7..a53a7309ca717 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -86,7 +86,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter implem /** Conflict put values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection> conflictPutVals; + private Collection conflictPutVals; /** Conflict remove values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -187,7 +187,7 @@ public GridNearAtomicUpdateFuture( Collection keys, @Nullable Collection vals, @Nullable Object[] invokeArgs, - @Nullable Collection> conflictPutVals, + @Nullable Collection conflictPutVals, @Nullable Collection conflictRmvVals, final boolean retval, final boolean rawRetval, @@ -543,7 +543,7 @@ private void map0(GridDiscoveryTopologySnapshot topSnapshot, } else if (conflictPutVals != null) { // Conflict PUT. - GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); + GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); @@ -631,7 +631,7 @@ else if (conflictRmvVals != null) { if (vals != null) it = vals.iterator(); - Iterator> conflictPutValsIt = null; + Iterator conflictPutValsIt = null; if (conflictPutVals != null) conflictPutValsIt = conflictPutVals.iterator(); @@ -679,7 +679,7 @@ else if (conflictRmvVals != null) { } } else if (conflictPutVals != null) { - GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); conflictVer = conflictPutVal.version(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java index 08b43e08c7c3c..74c83e2d27459 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java @@ -28,11 +28,11 @@ * Cache object and version. */ public class CacheVersionedValue implements Message { - /** Cache version. */ - private GridCacheVersion vers; + /** Value. */ + private CacheObject val; - /** Cache object. */ - private CacheObject obj; + /** Cache version. */ + private GridCacheVersion ver; /** */ public CacheVersionedValue() { @@ -40,26 +40,26 @@ public CacheVersionedValue() { } /** - * @param vers Cache version. - * @param obj Cache object. + * @param val Cache value. + * @param ver Cache version. */ - CacheVersionedValue(GridCacheVersion vers, CacheObject obj) { - this.vers = vers; - this.obj = obj; + CacheVersionedValue(CacheObject val, GridCacheVersion ver) { + this.val = val; + this.ver = ver; } /** * @return Cache version. */ public GridCacheVersion version() { - return vers; + return ver; } /** * @return Cache object. */ - public CacheObject cacheObject() { - return obj; + public CacheObject value() { + return val; } /** @@ -70,12 +70,12 @@ public CacheObject cacheObject() { * @throws IgniteCheckedException If failed. */ public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { - if (obj != null) - obj.prepareMarshal(ctx); + if (val != null) + val.prepareMarshal(ctx); } /** - * This method is called after the whole message is recived + * This method is called after the whole message is received * and is responsible for unmarshalling state. * * @param ctx Context. @@ -83,8 +83,8 @@ public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException * @throws IgniteCheckedException If failed. */ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (obj != null) - obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + if (val != null) + val.finishUnmarshal(ctx.cacheObjectContext(), ldr); } /** {@inheritDoc} */ @@ -100,13 +100,13 @@ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws Ignite switch (writer.state()) { case 0: - if (!writer.writeMessage("obj", obj)) + if (!writer.writeMessage("val", val)) return false; writer.incrementState(); case 1: - if (!writer.writeMessage("vers", vers)) + if (!writer.writeMessage("ver", ver)) return false; writer.incrementState(); @@ -125,7 +125,7 @@ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws Ignite switch (reader.state()) { case 0: - obj = reader.readMessage("obj"); + val = reader.readMessage("obj"); if (!reader.isLastRead()) return false; @@ -133,7 +133,7 @@ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws Ignite reader.incrementState(); case 1: - vers = reader.readMessage("vers"); + ver = reader.readMessage("ver"); if (!reader.isLastRead()) return false; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 9aa407f680121..a5050e65d3786 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -517,12 +517,13 @@ public void processDhtAtomicUpdateRequest( } /** {@inheritDoc} */ - @Override public void putAllConflict(Map> drMap) throws IgniteCheckedException { + @Override public void putAllConflict(Map drMap) throws IgniteCheckedException { dht.putAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllConflictAsync(Map> drMap) throws IgniteCheckedException { + @Override public IgniteInternalFuture putAllConflictAsync(Map drMap) + throws IgniteCheckedException { return dht.putAllConflictAsync(drMap); } @@ -641,12 +642,14 @@ public void processDhtAtomicUpdateRequest( } /** {@inheritDoc} */ - @Override public void removeAllConflict(Map drMap) throws IgniteCheckedException { + @Override public void removeAllConflict(Map drMap) + throws IgniteCheckedException { dht.removeAllConflict(drMap); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) throws IgniteCheckedException { + @Override public IgniteInternalFuture removeAllConflictAsync(Map drMap) + throws IgniteCheckedException { return dht.removeAllConflictAsync(drMap); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 557331c85fd63..3a868886b37e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -948,7 +948,7 @@ void onResult(UUID nodeId, GridNearTxPrepareResponse res) { CacheVersionedValue tup = entry.getValue(); - nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(), + nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(), tup.version(), m.node().id()); } else if (txEntry.cached().detached()) { @@ -956,7 +956,7 @@ else if (txEntry.cached().detached()) { CacheVersionedValue tup = entry.getValue(); - detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion()); + detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion()); } break; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index f88d36316e5a4..711c01fc5781c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -174,7 +174,7 @@ public void addOwnedValue(IgniteTxKey key, GridCacheVersion ver, CacheObject val if (ownedVals == null) ownedVals = new HashMap<>(); - CacheVersionedValue oVal = new CacheVersionedValue(ver, val); + CacheVersionedValue oVal = new CacheVersionedValue(val, ver); ownedVals.put(key, oVal); } @@ -266,20 +266,20 @@ public Collection invalidPartitions() { Iterator keyIter = ownedValKeys.iterator(); - Iterator valueIter = ownedValVals.iterator(); + Iterator valIter = ownedValVals.iterator(); while (keyIter.hasNext()) { IgniteTxKey key = keyIter.next(); GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - CacheVersionedValue value = valueIter.next(); + CacheVersionedValue val = valIter.next(); key.finishUnmarshal(cctx, ldr); - value.finishUnmarshal(cctx, ldr); + val.finishUnmarshal(cctx, ldr); - ownedVals.put(key, value); + ownedVals.put(key, val); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java index e2feb1385041f..f003e84ad2cd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrExpirationInfo.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.dr; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -25,7 +26,7 @@ /** * Cache DR info used as argument in PUT cache internal interfaces with expiration info added. */ -public class GridCacheDrExpirationInfo extends GridCacheDrInfo { +public class GridCacheDrExpirationInfo extends GridCacheDrInfo { /** */ private static final long serialVersionUID = 0L; @@ -50,7 +51,7 @@ public GridCacheDrExpirationInfo() { * @param ttl TTL. * @param expireTime Expire time. */ - public GridCacheDrExpirationInfo(V val, GridCacheVersion ver, long ttl, long expireTime) { + public GridCacheDrExpirationInfo(CacheObject val, GridCacheVersion ver, long ttl, long expireTime) { super(val, ver); this.ttl = ttl; @@ -71,20 +72,4 @@ public GridCacheDrExpirationInfo(V val, GridCacheVersion ver, long ttl, long exp @Override public String toString() { return S.toString(GridCacheDrExpirationInfo.class, this); } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeLong(ttl); - out.writeLong(expireTime); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - ttl = in.readLong(); - expireTime = in.readLong(); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index 6c875d85459b2..b3411e1e4f840 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.dr; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -25,12 +27,12 @@ /** * Cache DR info used as argument in PUT cache internal interfaces. */ -public class GridCacheDrInfo implements Externalizable { +public class GridCacheDrInfo implements Externalizable { /** */ private static final long serialVersionUID = 0L; /** Value. */ - private V val; + private CacheObject val; /** DR version. */ private GridCacheVersion ver; @@ -48,7 +50,7 @@ public GridCacheDrInfo() { * @param val Value. * @param ver Version. */ - public GridCacheDrInfo(V val, GridCacheVersion ver) { + public GridCacheDrInfo(CacheObject val, GridCacheVersion ver) { assert val != null; assert ver != null; @@ -59,7 +61,7 @@ public GridCacheDrInfo(V val, GridCacheVersion ver) { /** * @return Value. */ - public V value() { + public CacheObject value() { return val; } @@ -84,21 +86,18 @@ public long expireTime() { return CU.EXPIRE_TIME_ETERNAL; } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheDrInfo.class, this); + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + assert false; } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(val); - CU.writeVersion(out, ver); + @Override + public void writeExternal(ObjectOutput out) throws IOException { + assert false; } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - val = (V)in.readObject(); - ver = CU.readVersion(in); + @Override public String toString() { + return S.toString(GridCacheDrInfo.class, this); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java index 85831a8bcda7e..dbf90c92fdc22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java @@ -26,7 +26,7 @@ /** * Replication manager class which processes all replication events. */ -public interface GridCacheDrManager extends GridCacheManager { +public interface GridCacheDrManager extends GridCacheManager { /** * @return Data center ID. */ @@ -41,19 +41,15 @@ public interface GridCacheDrManager extends GridCacheManager { * Performs replication. * * @param key Key. - * @param keyBytes Key bytes. * @param val Value. - * @param valBytes Value bytes. * @param ttl TTL. * @param expireTime Expire time. * @param ver Version. * @param drType Replication type. * @throws IgniteCheckedException If failed. */ - public void replicate(K key, - @Nullable byte[] keyBytes, - @Nullable V val, - @Nullable byte[] valBytes, + public void replicate(KeyCacheObject key, + @Nullable CacheObject val, long ttl, long expireTime, GridCacheVersion ver, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java index 49f617b2e7248..7cec4ebd73747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java @@ -27,14 +27,14 @@ /** * No-op implementation for {@link GridCacheDrManager}. */ -public class GridOsCacheDrManager implements GridCacheDrManager { +public class GridOsCacheDrManager implements GridCacheDrManager { /** {@inheritDoc} */ @Override public boolean enabled() { return false; } /** {@inheritDoc} */ - @Override public void start(GridCacheContext cctx) throws IgniteCheckedException { + @Override public void start(GridCacheContext cctx) throws IgniteCheckedException { // No-op. } @@ -69,10 +69,8 @@ public class GridOsCacheDrManager implements GridCacheDrManager { } /** {@inheritDoc} */ - @Override public void replicate(K key, - @Nullable byte[] keyBytes, - @Nullable V val, - @Nullable byte[] valBytes, + @Override public void replicate(KeyCacheObject key, + @Nullable CacheObject val, long ttl, long expireTime, GridCacheVersion ver, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java index 1ff174fb2442d..ef9204dc0fc7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManagerAdapter.java @@ -25,7 +25,7 @@ /** * Provides possibility to integrate cache transactions with JTA. */ -public abstract class CacheJtaManagerAdapter extends GridCacheManagerAdapter { +public abstract class CacheJtaManagerAdapter extends GridCacheManagerAdapter { /** * Creates transaction manager finder. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java index 459003147db24..451357f451042 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheNoopJtaManager.java @@ -24,7 +24,7 @@ /** * No-op implementation of {@link CacheJtaManagerAdapter}. */ -public class CacheNoopJtaManager extends CacheJtaManagerAdapter { +public class CacheNoopJtaManager extends CacheJtaManagerAdapter { /** {@inheritDoc} */ @Override public void checkJta() throws IgniteCheckedException { // No-op. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0e37349352bd6..b46bc598bacac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1298,8 +1298,11 @@ public String resolveTaskName() { * @throws GridCacheEntryRemovedException If entry got removed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - protected IgniteBiTuple> conflictResolve( - GridCacheOperation op, IgniteTxEntry txEntry, V newVal, byte[] newValBytes, GridCacheVersion newVer, + protected IgniteBiTuple conflictResolve( + GridCacheOperation op, + IgniteTxEntry txEntry, + CacheObject newVal, + GridCacheVersion newVer, GridCacheEntryEx old) throws IgniteCheckedException, GridCacheEntryRemovedException { assert newVer != null; @@ -1351,19 +1354,19 @@ else if (op == UPDATE) GridCacheVersionedEntryEx oldEntry = old.versionedEntry(); // Construct new entry info. - if (newVal == null && newValBytes != null) - newVal = cctx.marshaller().unmarshal(newValBytes, cctx.deploy().globalLoader()); - - GridCacheVersionedEntryEx newEntry = - new GridCachePlainVersionedEntry<>((K)txEntry.key(), newVal, newTtl, newExpireTime, newVer); + Object newVal0 = CU.value(newVal, txEntry.context(), false); - GridCacheVersionConflictContext ctx = null; + GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry( + oldEntry.key(), + newVal0, + newTtl, + newExpireTime, + newVer); - // TODO IGNITE-51. - //GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false); + GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false); if (ctx.isMerge()) { - V resVal = ctx.mergeValue(); + Object resVal = ctx.mergeValue(); if ((op == CREATE || op == UPDATE) && resVal == null) op = DELETE; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 342ebd085c9f9..f4173115ad2cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -770,32 +770,29 @@ else if (cacheCtx.isNear() && txEntry.locallyMapped()) GridCacheVersionConflictContext conflictCtx = null; if (conflictNeedResolve) { -// TODO IGNITE-51. -// IgniteBiTuple> -// conflictRes = conflictResolve(op, txEntry, val, valBytes, explicitVer, -// cached); -// -// assert conflictRes != null; -// -// conflictCtx = conflictRes.get2(); -// -// if (conflictCtx.isUseOld()) -// op = NOOP; -// else if (conflictCtx.isUseNew()) { -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } -// else { -// assert conflictCtx.isMerge(); -// -// op = conflictRes.get1(); -// val = conflictCtx.mergeValue(); -// valBytes = null; -// explicitVer = writeVersion(); -// -// txEntry.ttl(conflictCtx.ttl()); -// txEntry.conflictExpireTime(conflictCtx.expireTime()); -// } + IgniteBiTuple conflictRes = + conflictResolve(op, txEntry, val, explicitVer, cached); + + assert conflictRes != null; + + conflictCtx = conflictRes.get2(); + + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + else { + assert conflictCtx.isMerge(); + + op = conflictRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); + + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } } else // Nullify explicit version so that innerSet/innerRemove will work as usual. @@ -1886,11 +1883,11 @@ private IgniteInternalFuture> checkMissed( } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllDrAsync( + @Override public IgniteInternalFuture putAllDrAsync( GridCacheContext cacheCtx, - Map> drMap + Map drMap ) { - return putAllAsync0(cacheCtx, + return this.putAllAsync0(cacheCtx, null, null, null, @@ -1918,9 +1915,9 @@ private IgniteInternalFuture> checkMissed( } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllDrAsync( + @Override public IgniteInternalFuture removeAllDrAsync( GridCacheContext cacheCtx, - Map drMap + Map drMap ) { return removeAllAsync0(cacheCtx, null, drMap, null, false, null); } @@ -1960,20 +1957,20 @@ private boolean filter(GridCacheEntryEx cached, */ protected IgniteInternalFuture> enlistWrite( final GridCacheContext cacheCtx, - Collection keys, + Collection keys, @Nullable GridCacheEntryEx cached, @Nullable ExpiryPolicy expiryPlc, boolean implicit, - @Nullable Map lookup, - @Nullable Map> invokeMap, + @Nullable Map lookup, + @Nullable Map> invokeMap, @Nullable Object[] invokeArgs, boolean retval, boolean lockOnly, CacheEntryPredicate[] filter, final GridCacheReturn ret, Collection enlisted, - @Nullable Map> drPutMap, - @Nullable Map drRmvMap + @Nullable Map drPutMap, + @Nullable Map drRmvMap ) { assert cached == null || keys.size() == 1; assert cached == null || F.first(keys).equals(cached.key()); @@ -1998,14 +1995,14 @@ protected IgniteInternalFuture> enlistWrite( groupLockSanityCheck(cacheCtx, keys); - for (K key : keys) { + for (Object key : keys) { if (key == null) { setRollbackOnly(); throw new NullPointerException("Null key."); } - V val = rmv || lookup == null ? null : lookup.get(key); + Object val = rmv || lookup == null ? null : lookup.get(key); EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); GridCacheVersion drVer; @@ -2013,7 +2010,7 @@ protected IgniteInternalFuture> enlistWrite( long drExpireTime; if (drPutMap != null) { - GridCacheDrInfo info = drPutMap.get(key); + GridCacheDrInfo info = drPutMap.get(key); assert info != null; @@ -2510,7 +2507,7 @@ private IgniteInternalFuture putAllAsync0( @Nullable Map map, @Nullable Map> invokeMap, @Nullable final Object[] invokeArgs, - @Nullable final Map> drMap, + @Nullable final Map drMap, final boolean retval, @Nullable GridCacheEntryEx cached, @Nullable final CacheEntryPredicate[] filter @@ -2523,14 +2520,14 @@ private IgniteInternalFuture putAllAsync0( needReturnValue(true); // Cached entry may be passed only from entry wrapper. - final Map map0; - final Map> invokeMap0; + final Map map0; + final Map> invokeMap0; if (drMap != null) { assert map == null; - map0 = (Map)F.viewReadOnly(drMap, new IgniteClosure, V>() { - @Override public V apply(GridCacheDrInfo val) { + map0 = F.viewReadOnly(drMap, new IgniteClosure() { + @Override public Object apply(GridCacheDrInfo val) { return val.value(); } }); @@ -2538,7 +2535,7 @@ private IgniteInternalFuture putAllAsync0( invokeMap0 = null; } else { - map0 = (Map)map; + map0 = map; invokeMap0 = (Map>)invokeMap; } @@ -2573,7 +2570,7 @@ private IgniteInternalFuture putAllAsync0( } try { - Set keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); + Set keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); Collection enlisted = new ArrayList<>(); @@ -2721,7 +2718,7 @@ private IgniteInternalFuture putAllAsync0( private IgniteInternalFuture> removeAllAsync0( final GridCacheContext cacheCtx, @Nullable final Collection keys, - @Nullable Map drMap, + @Nullable Map drMap, @Nullable GridCacheEntryEx cached, final boolean retval, @Nullable final CacheEntryPredicate[] filter) { @@ -2730,7 +2727,7 @@ private IgniteInternalFuture> removeAllAsync if (retval) needReturnValue(true); - final Collection keys0; + final Collection keys0; if (drMap != null) { assert keys == null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 1af63787ff552..936e4e8923bcb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -129,18 +129,18 @@ public IgniteInternalFuture> removeAllAsync( * @param drMap DR map to put. * @return Future for DR put operation. */ - public IgniteInternalFuture putAllDrAsync( + public IgniteInternalFuture putAllDrAsync( GridCacheContext cacheCtx, - Map> drMap); + Map drMap); /** * @param cacheCtx Cache context. * @param drMap DR map. * @return Future for asynchronous remove. */ - public IgniteInternalFuture removeAllDrAsync( + public IgniteInternalFuture removeAllDrAsync( GridCacheContext cacheCtx, - Map drMap); + Map drMap); /** * Performs keys locking for affinity-based group lock transactions. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java index 23585fbc59bb1..8235ec0241aa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java @@ -18,31 +18,26 @@ package org.apache.ignite.internal.processors.cache.version; import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.marshaller.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import java.io.*; -import java.util.*; +import java.nio.*; /** * Raw versioned entry. */ -public class GridCacheRawVersionedEntry implements GridCacheVersionedEntry, GridCacheVersionable, - Map.Entry, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Key. */ - private K key; - +public class GridCacheRawVersionedEntry extends IgniteDataLoaderEntry implements + GridCacheVersionedEntry, GridCacheVersionable, Externalizable { /** Key bytes. */ + @GridDirectTransient private byte[] keyBytes; - /** Value. */ - private V val; - /** Value bytes. */ private byte[] valBytes; @@ -63,21 +58,44 @@ public GridCacheRawVersionedEntry() { } /** - * Constructor. + * Constructor used for local store load when key and value are available. * * @param key Key. - * @param keyBytes Key bytes. * @param val Value. - * @param valBytes Value bytes. * @param expireTime Expire time. * @param ttl TTL. * @param ver Version. */ - public GridCacheRawVersionedEntry(K key, @Nullable byte[] keyBytes, @Nullable V val, @Nullable byte[] valBytes, - long ttl, long expireTime, GridCacheVersion ver) { + public GridCacheRawVersionedEntry(KeyCacheObject key, + @Nullable CacheObject val, + long ttl, + long expireTime, + GridCacheVersion ver) { + assert key != null; + this.key = key; - this.keyBytes = keyBytes; this.val = val; + this.ttl = ttl; + this.expireTime = expireTime; + this.ver = ver; + } + + /** + * Constructor used in receiver hub where marshalled key and value are available and we do not want to + * unmarshal value. + * + * @param keyBytes Key. + * @param valBytes Value bytes. + * @param expireTime Expire time. + * @param ttl TTL. + * @param ver Version. + */ + public GridCacheRawVersionedEntry(byte[] keyBytes, + byte[] valBytes, + long ttl, + long expireTime, + GridCacheVersion ver) { + this.keyBytes = keyBytes; this.valBytes = valBytes; this.ttl = ttl; this.expireTime = expireTime; @@ -88,7 +106,14 @@ public GridCacheRawVersionedEntry(K key, @Nullable byte[] keyBytes, @Nullable V @Override public K key() { assert key != null : "Entry is being improperly processed."; - return key; + return key.value(null, false); + } + + /** + * @param key Key. + */ + public void key(KeyCacheObject key) { + this.key = key; } /** @@ -100,7 +125,7 @@ public byte[] keyBytes() { /** {@inheritDoc} */ @Override public V value() { - return val; + return val != null ? val.value(null, false) : null; } /** @@ -149,90 +174,205 @@ public byte[] valueBytes() { * Perform internal unmarshal of this entry. It must be performed after entry is deserialized and before * its restored key/value are needed. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void unmarshal(Marshaller marsh) throws IgniteCheckedException { - unmarshalKey(marsh); + public void unmarshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + unmarshalKey(ctx, marsh); - if (valBytes != null && val == null) + if (val == null && valBytes != null) { val = marsh.unmarshal(valBytes, null); + + val.finishUnmarshal(ctx, null); + } + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void unmarshal(CacheObjectContext ctx) throws IgniteCheckedException { + assert key != null; + + key.finishUnmarshal(ctx, null); + + if (val != null) + val.finishUnmarshal(ctx, null); } /** * Perform internal key unmarshal of this entry. It must be performed after entry is deserialized and before * its restored key/value are needed. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void unmarshalKey(Marshaller marsh) throws IgniteCheckedException { - if (key == null) + public void unmarshalKey(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + if (key == null) { + assert keyBytes != null; + key = marsh.unmarshal(keyBytes, null); + + key.finishUnmarshal(ctx, null); + } } /** * Perform internal marshal of this entry before it will be serialized. * + * @param ctx Context. * @param marsh Marshaller. * @throws IgniteCheckedException If failed. */ - public void marshal(Marshaller marsh) throws IgniteCheckedException { - if (keyBytes == null) + public void marshal(CacheObjectContext ctx, Marshaller marsh) throws IgniteCheckedException { + if (keyBytes == null) { + key.prepareMarshal(ctx); + keyBytes = marsh.marshal(key); + } + + if (valBytes == null && val != null) { + val.prepareMarshal(ctx); - if (valBytes == null && val != null) valBytes = marsh.marshal(val); + } + } + + /** + * @param ctx Context. + * @throws IgniteCheckedException If failed. + */ + public void prepareDirectMarshal(CacheObjectContext ctx) throws IgniteCheckedException { + key.prepareMarshal(ctx); + + if (val != null) + val.prepareMarshal(ctx); } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - assert keyBytes != null; + @Override public byte directType() { + return 103; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 2: + expireTime = reader.readLong("expireTime"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + valBytes = reader.readByteArray("valBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); - U.writeByteArray(out, keyBytes); - U.writeByteArray(out, valBytes); + case 5: + ver = reader.readMessage("ver"); - out.writeLong(ttl); + if (!reader.isLastRead()) + return false; - if (ttl != 0) - out.writeLong(expireTime); + reader.incrementState(); - out.writeObject(ver); + } + + assert key != null; + assert !(val != null && valBytes != null); + + return true; } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - keyBytes = U.readByteArray(in); - valBytes = U.readByteArray(in); + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert key != null; + assert !(val != null && valBytes != null); + + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 2: + if (!writer.writeLong("expireTime", expireTime)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("ttl", ttl)) + return false; - ttl = in.readLong(); + writer.incrementState(); - if (ttl != 0) - expireTime = in.readLong(); + case 4: + if (!writer.writeByteArray("valBytes", valBytes)) + return false; - ver = (GridCacheVersion)in.readObject(); + writer.incrementState(); - assert keyBytes != null; + case 5: + if (!writer.writeMessage("ver", ver)) + return false; + + writer.incrementState(); + + } + + return true; } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen", - keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", valBytes != null ? valBytes.length : "n/a"); + @Override public byte fieldsCount() { + return 6; } /** {@inheritDoc} */ - @Override public K getKey() { - return key(); + @Override public void writeExternal(ObjectOutput out) throws IOException { + assert false; } /** {@inheritDoc} */ - @Override public V getValue() { - return value(); + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + assert false; } /** {@inheritDoc} */ - @Override public V setValue(V val) { - throw new UnsupportedOperationException(); + @Override public String toString() { + return S.toString(GridCacheRawVersionedEntry.class, this, "keyBytesLen", + keyBytes != null ? keyBytes.length : "n/a", "valBytesLen", + valBytes != null ? valBytes.length : "n/a"); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java index 9a6cbd29762bb..1fe8a25d1d1db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionEx.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.processors.cache.version; +import org.apache.ignite.plugin.extensions.communication.*; + import java.io.*; +import java.nio.*; /** * Extended cache version which also has additional DR version. @@ -86,6 +89,66 @@ public GridCacheVersion conflictVersion() { return GG_CLASS_ID; } + /** {@inheritDoc} */ + @Override public byte directType() { + return 104; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 4: + if (!writer.writeMessage("drVer", drVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 4: + drVer = reader.readMessage("drVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException { super.readExternal(in); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 74b38f936ca97..20dd6bf8f06c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -37,7 +37,7 @@ * like, for example GridCacheContext, as it may be reused between different * caches. */ -public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { +public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** Timestamp used as base time for cache topology version (January 1, 2014). */ public static final long TOP_VER_BASE_TIME = 1388520000000L; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java index 7d46e233ae810..e54a28150727a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java @@ -57,6 +57,7 @@ class GridDataLoadUpdateJob implements GridPlainCallable { * @param cacheName Cache name. * @param col Entries to put. * @param ignoreDepOwnership {@code True} to ignore deployment ownership. + * @param skipStore Skip store flag. * @param updater Updater. */ GridDataLoadUpdateJob( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java index 89bebe4124389..f719cfc6af169 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderEntry.java @@ -31,11 +31,11 @@ public class IgniteDataLoaderEntry implements Map.Entry, Message { /** */ @GridToStringInclude - private KeyCacheObject key; + protected KeyCacheObject key; /** */ @GridToStringInclude - private CacheObject val; + protected CacheObject val; /** * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 30fd8bb227a6a..154e6854a0a11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -242,6 +242,13 @@ else if (log.isDebugEnabled()) publicFut = new IgniteFutureImpl<>(fut); } + /** + * @return Cache object context. + */ + public CacheObjectContext cacheObjectContext() { + return cacheObjCtx; + } + /** * Enters busy lock. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java index 115141d7fbff4..ae8c77b882d83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java @@ -32,19 +32,20 @@ /** * Data center replication cache updater for data loader. */ -public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Updater, +public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Updater, GridDataLoadCacheUpdaters.InternalUpdater { /** */ private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void update(IgniteCache cache0, Collection> col) { + @Override public void update(IgniteCache cache0, + Collection> col) { try { String cacheName = cache0.getConfiguration(CacheConfiguration.class).getName(); GridKernalContext ctx = ((IgniteKernal)cache0.unwrap(Ignite.class)).context(); IgniteLogger log = ctx.log(GridDrDataLoadCacheUpdater.class); - GridCacheAdapter cache = ctx.cache().internalCache(cacheName); + GridCacheAdapter cache = ctx.cache().internalCache(cacheName); assert !F.isEmpty(col); @@ -56,20 +57,24 @@ public class GridDrDataLoadCacheUpdater implements IgniteDataLoader.Update if (!f.isDone()) f.get(); - for (Map.Entry entry0 : col) { - GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0; + CacheObjectContext cacheObjCtx = cache.context().cacheObjectContext(); - entry.unmarshal(ctx.config().getMarshaller()); + for (Map.Entry entry0 : col) { + GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)entry0; - K key = entry.key(); + entry.unmarshal(cacheObjCtx, ctx.config().getMarshaller()); + + KeyCacheObject key = entry.getKey(); // Ensure that updater to not receive special-purpose values for TTL and expire time. assert entry.ttl() != CU.TTL_NOT_CHANGED && entry.ttl() != CU.TTL_ZERO && entry.ttl() >= 0; assert entry.expireTime() != CU.EXPIRE_TIME_CALCULATE && entry.expireTime() >= 0; - GridCacheDrInfo val = entry.value() != null ? entry.ttl() != CU.TTL_ETERNAL ? - new GridCacheDrExpirationInfo<>(entry.value(), entry.version(), entry.ttl(), entry.expireTime()) : - new GridCacheDrInfo<>(entry.value(), entry.version()) : null; + CacheObject cacheVal = entry.getValue(); + + GridCacheDrInfo val = cacheVal != null ? entry.ttl() != CU.TTL_ETERNAL ? + new GridCacheDrExpirationInfo(cacheVal, entry.version(), entry.ttl(), entry.expireTime()) : + new GridCacheDrInfo(cacheVal, entry.version()) : null; if (val == null) cache.removeAllConflict(Collections.singletonMap(key, entry.version())); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java index 6b97f9acf3f66..1235a1d64b24d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerLoadTest.java @@ -60,7 +60,7 @@ public void testLoad() throws Exception { } }, 1); - GridCacheTtlManager ttlMgr = g.internalCache().context().ttl(); + GridCacheTtlManager ttlMgr = g.internalCache().context().ttl(); for (int i = 0; i < 300; i++) { U.sleep(1000); diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index f1f58f91143c9..644644e2f01ae 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -48,7 +48,7 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { new GridCacheSharedContext<>( ctx, new IgniteTxManager(), - new GridCacheVersionManager(), + new GridCacheVersionManager(), new GridCacheMvccManager(), new GridCacheDeploymentManager(), new GridCachePartitionExchangeManager(), @@ -67,8 +67,8 @@ public GridCacheTestContext(GridTestKernalContext ctx) throws Exception { new CacheContinuousQueryManager(), new GridCacheAffinityManager(), new CacheDataStructuresManager(), - new GridCacheTtlManager(), - new GridOsCacheDrManager(), - new CacheNoopJtaManager()); + new GridCacheTtlManager(), + new GridOsCacheDrManager(), + new CacheNoopJtaManager()); } } diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java index 6077e4a751ddf..7153c7d138ae6 100644 --- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java +++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java @@ -28,7 +28,7 @@ /** * Implementation of {@link CacheJtaManagerAdapter}. */ -public class CacheJtaManager extends CacheJtaManagerAdapter { +public class CacheJtaManager extends CacheJtaManagerAdapter { /** */ private final ThreadLocal xaRsrc = new ThreadLocal<>();