From b54a481315a45c7a6c8f70534f655e14b25cc439 Mon Sep 17 00:00:00 2001 From: dkarachentsev Date: Wed, 18 Jan 2017 12:05:22 +0300 Subject: [PATCH] ignite-4525 - Near reader is created when value is loaded from store. --- .../processors/cache/GridCacheAdapter.java | 33 +- .../processors/cache/GridCacheContext.java | 4 +- .../processors/cache/GridCacheEntryEx.java | 20 +- .../processors/cache/GridCacheMapEntry.java | 56 ++- .../processors/cache/ReaderArguments.java | 74 +++ .../distributed/dht/GridDhtCacheAdapter.java | 9 +- .../distributed/dht/GridDhtGetFuture.java | 85 ++-- .../dht/GridDhtGetSingleFuture.java | 75 ++- .../dht/GridPartitionedGetFuture.java | 3 +- .../dht/GridPartitionedSingleGetFuture.java | 3 +- .../dht/atomic/GridDhtAtomicCache.java | 5 +- .../dht/colocated/GridDhtColocatedCache.java | 3 +- .../distributed/near/GridNearGetFuture.java | 6 +- .../local/atomic/GridLocalAtomicCache.java | 5 +- .../transactions/IgniteTxLocalAdapter.java | 35 +- .../cache/GridCacheTestEntryEx.java | 11 +- .../near/GridNearCacheStoreUpdateTest.java | 466 ++++++++++++++++++ .../GridNearOffheapCacheStoreUpdateTest.java | 35 ++ .../testsuites/IgniteCacheTestSuite2.java | 5 + 19 files changed, 770 insertions(+), 163 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fd9f396926f0e..59665bb58708f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1789,6 +1789,7 @@ protected IgniteInternalFuture> getAllAsync( subjId = ctx.subjectIdPerCall(subjId, opCtx); return getAllAsync(keys, + null, opCtx == null || !opCtx.skipStore(), !skipTx, subjId, @@ -1803,6 +1804,7 @@ protected IgniteInternalFuture> getAllAsync( /** * @param keys Keys. + * @param readerArgs Near cache reader will be added if not null. * @param readThrough Read through. * @param checkTx Check tx. * @param subjId Subj Id. @@ -1817,6 +1819,7 @@ protected IgniteInternalFuture> getAllAsync( * @see GridCacheAdapter#getAllAsync(Collection) */ public final IgniteInternalFuture> getAllAsync(@Nullable final Collection keys, + @Nullable final ReaderArguments readerArgs, boolean readThrough, boolean checkTx, @Nullable final UUID subjId, @@ -1834,6 +1837,7 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect validateCacheKeys(keys); return getAllAsync0(ctx.cacheKeysView(keys), + readerArgs, readThrough, checkTx, subjId, @@ -1848,6 +1852,7 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect /** * @param keys Keys. + * @param readerArgs Near cache reader will be added if not null. * @param readThrough Read-through flag. * @param checkTx Check local transaction flag. * @param subjId Subject ID. @@ -1862,6 +1867,7 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect */ protected final IgniteInternalFuture> getAllAsync0( @Nullable final Collection keys, + @Nullable final ReaderArguments readerArgs, final boolean readThrough, boolean checkTx, @Nullable final UUID subjId, @@ -1932,7 +1938,8 @@ protected final IgniteInternalFuture> getAllAsync0( subjId, taskName, expiry, - !deserializeBinary); + !deserializeBinary, + readerArgs); assert res != null; @@ -1957,7 +1964,8 @@ protected final IgniteInternalFuture> getAllAsync0( null, taskName, expiry, - !deserializeBinary); + !deserializeBinary, + readerArgs); if (res == null) ctx.evicts().touch(entry, topVer); @@ -2015,29 +2023,28 @@ protected final IgniteInternalFuture> getAllAsync0( GridCacheEntryEx entry = entryEx(key); try { - GridCacheVersion verSet = entry.versionedValue(cacheVal, + T2 verVal = entry.versionedValue( + cacheVal, res.version(), - null); - - boolean set = verSet != null; + null, + readerArgs); if (log.isDebugEnabled()) log.debug("Set value loaded from store into entry [" + - "set=" + set + - ", curVer=" + res.version() + - ", newVer=" + verSet + ", " + + "oldVer=" + res.version() + + ", newVer=" + verVal.get2() + ", " + "entry=" + entry + ']'); // Don't put key-value pair into result map if value is null. - if (val != null) { + if (verVal.get1() != null) { ctx.addResult(map, key, - cacheVal, + verVal.get1(), skipVals, keepCacheObjects, deserializeBinary, - false, - needVer ? set ? verSet : res.version() : null); + true, + needVer ? verVal.get2() : null); } if (tx0 == null || (!tx0.implicit() && diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 66b71b4666e7b..424e325dd07f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1900,9 +1900,9 @@ public void addResult(Map map, assert val != null || skipVals; if (!keepCacheObjects) { - Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary); + Object key0 = unwrapBinaryIfNeeded(key, !deserializeBinary, cpy); - Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary); + Object val0 = skipVals ? true : unwrapBinaryIfNeeded(val, !deserializeBinary, cpy); assert key0 != null : key; assert val0 != null : val; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index b1d632fd5943d..51f423a7302a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -318,11 +319,12 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, * @param taskName Task name. * @param expiryPlc Expiry policy. * @param keepBinary Keep binary flag. + * @param readerArgs Reader will be added if not null. * @return Cached value and entry version. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable public EntryGetResult innerGetVersioned( + public EntryGetResult innerGetVersioned( @Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean readSwap, @@ -333,7 +335,8 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -344,7 +347,7 @@ public boolean evictInternal(boolean swap, GridCacheVersion obsoleteVer, * @param taskName Task name. * @param expiryPlc Expiry policy. * @param keepBinary Keep binary flag. - * @return Cached value and entry version. + * @param readerArgs Reader will be added if not null. * @throws IgniteCheckedException If loading value failed. * @throws GridCacheEntryRemovedException If entry was removed. * @return Cached value, entry version and flag indicating if entry was reserved. @@ -355,7 +358,8 @@ public EntryGetResult innerGetAndReserveForLoad(boolean readSwap, UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * @param ver Expected entry version. @@ -751,13 +755,15 @@ public GridCacheVersionedEntryEx versionedEntry(final boolean keepB * @param val New value. * @param curVer Version to match or {@code null} if match is not required. * @param newVer Version to set. - * @return Non null version if value was set. + * @param readerArgs Reader will be added if not null. + * @return Current version and value. * @throws IgniteCheckedException If index could not be updated. * @throws GridCacheEntryRemovedException If entry was removed. */ - public GridCacheVersion versionedValue(CacheObject val, + public T2 versionedValue(CacheObject val, @Nullable GridCacheVersion curVer, - @Nullable GridCacheVersion newVer) + @Nullable GridCacheVersion newVer, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9f0c2b0a47c56..59e4181df0cee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -778,7 +778,8 @@ protected final void releaseSwap() throws IgniteCheckedException { expirePlc, false, keepBinary, - false); + false, + null); } /** {@inheritDoc} */ @@ -788,7 +789,8 @@ protected final void releaseSwap() throws IgniteCheckedException { UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { return (EntryGetResult)innerGet0( /*ver*/null, /*tx*/null, @@ -803,11 +805,12 @@ protected final void releaseSwap() throws IgniteCheckedException { expiryPlc, true, keepBinary, - /*reserve*/true); + /*reserve*/true, + readerArgs); } /** {@inheritDoc} */ - @Nullable @Override public EntryGetResult innerGetVersioned( + @Override public EntryGetResult innerGetVersioned( @Nullable GridCacheVersion ver, IgniteInternalTx tx, boolean readSwap, @@ -818,7 +821,8 @@ protected final void releaseSwap() throws IgniteCheckedException { Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) + boolean keepBinary, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { return (EntryGetResult)innerGet0(ver, tx, @@ -833,7 +837,8 @@ protected final void releaseSwap() throws IgniteCheckedException { expiryPlc, true, keepBinary, - false); + false, + readerArgs); } /** {@inheritDoc} */ @@ -852,7 +857,8 @@ private Object innerGet0( @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean retVer, boolean keepBinary, - boolean reserveForLoad + boolean reserveForLoad, + @Nullable ReaderArguments readerArgs ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert !(retVer && readThrough); assert !(reserveForLoad && readThrough); @@ -961,6 +967,8 @@ private Object innerGet0( // Cache version for optimistic check. startVer = ver; + addReaderIfNeed(readerArgs); + if (ret != null) { assert tmp || !(ret instanceof BinaryObjectOffheapImpl); assert !obsolete; @@ -1051,6 +1059,8 @@ else if (tx.dht()) { if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); + + assert readerArgs == null; } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { @@ -3611,19 +3621,22 @@ private long nextPartCounter(AffinityTopologyVersion topVer) { } /** {@inheritDoc} */ - @Override public synchronized GridCacheVersion versionedValue(CacheObject val, + @Override public synchronized T2 versionedValue(CacheObject val, GridCacheVersion curVer, - GridCacheVersion newVer) + GridCacheVersion newVer, + @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException { checkObsolete(); + addReaderIfNeed(readerArgs); + if (curVer == null || curVer.equals(ver)) { if (val != this.val) { GridCacheMvcc mvcc = mvccExtras(); if (mvcc != null && !mvcc.isEmpty()) - return null; + return new T2<>(this.val, ver); if (newVer == null) newVer = cctx.versions().next(); @@ -3647,13 +3660,32 @@ private long nextPartCounter(AffinityTopologyVersion topVer) { // Version does not change for load ops. update(val, expTime, ttl, newVer, true); - return newVer; + return new T2<>(val, newVer); } assert !evictionDisabled() : this; } - return null; + return new T2<>(this.val, ver); + } + + /** + * @param readerArgs Reader arguments + */ + private void addReaderIfNeed(@Nullable ReaderArguments readerArgs) { + if (readerArgs != null) { + assert this instanceof GridDhtCacheEntry : this; + assert Thread.holdsLock(this); + + try { + ((GridDhtCacheEntry)this).addReader(readerArgs.reader(), + readerArgs.messageId(), + readerArgs.topologyVersion()); + } + catch (GridCacheEntryRemovedException e) { + assert false : this; + } + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java new file mode 100644 index 0000000000000..b8b5e64894fda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ReaderArguments.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Arguments required for adding near cache reader to entry. + */ +public class ReaderArguments { + /** */ + private final UUID reader; + + /** */ + private final long msgId; + + /** */ + private final AffinityTopologyVersion topVer; + + /** + * @param reader Near cache node ID. + * @param msgId Message ID. + * @param topVer Topology version. + */ + public ReaderArguments(final UUID reader, final long msgId, + final AffinityTopologyVersion topVer) { + this.reader = reader; + this.msgId = msgId; + this.topVer = topVer; + } + + /** + * @return Reader node ID. + */ + public UUID reader() { + return reader; + } + + /** + * @return Message ID. + */ + public long messageId() { + return msgId; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReaderArguments.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index b2fb7b4b392a4..543cee1809070 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -35,8 +35,8 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloader; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.ReaderArguments; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; @@ -623,6 +624,7 @@ else if (log.isDebugEnabled()) CacheOperationContext opCtx = ctx.operationContextPerCall(); return getAllAsync(keys, + null, opCtx == null || !opCtx.skipStore(), /*don't check local tx. */false, subjId, @@ -637,6 +639,7 @@ else if (log.isDebugEnabled()) /** * @param keys Keys to get + * @param readerArgs Reader will be added if not null. * @param readThrough Read through flag. * @param subjId Subject ID. * @param taskName Task name. @@ -647,6 +650,7 @@ else if (log.isDebugEnabled()) */ IgniteInternalFuture>> getDhtAllAsync( Collection keys, + @Nullable final ReaderArguments readerArgs, boolean readThrough, @Nullable UUID subjId, String taskName, @@ -655,6 +659,7 @@ IgniteInternalFuture>> get boolean canRemap ) { return getAllAsync0(keys, + readerArgs, readThrough, /*don't check local tx. */false, subjId, @@ -694,7 +699,6 @@ public GridDhtFuture> getDhtAsync(UUID reader, reader, keys, readThrough, - /*tx*/null, topVer, subjId, taskNameHash, @@ -738,7 +742,6 @@ private IgniteInternalFuture getDhtSingleAsync( key, addRdr, readThrough, - /*tx*/null, topVer, subjId, taskNameHash, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 913580f8c02ad..3bf44895a62a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -29,7 +29,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -37,7 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.ReaderArguments; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -96,9 +94,6 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture retries; @@ -120,7 +115,6 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture keys, boolean readThrough, - @Nullable IgniteTxLocalEx tx, @NotNull AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @@ -150,7 +143,6 @@ public GridDhtGetFuture( this.msgId = msgId; this.keys = keys; this.readThrough = readThrough; - this.tx = tx; this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -159,7 +151,7 @@ public GridDhtGetFuture( futId = IgniteUuid.randomUuid(); - ver = tx == null ? cctx.versions().next() : tx.xidVersion(); + ver = cctx.versions().next(); if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridDhtGetFuture.class); @@ -340,6 +332,8 @@ private IgniteInternalFuture> getAsync( ClusterNode readerNode = cctx.discovery().node(reader); + ReaderArguments readerArgs = null; + if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { for (Map.Entry k : keys.entrySet()) { while (true) { @@ -351,12 +345,19 @@ private IgniteInternalFuture> getAsync( boolean addReader = (!e.deleted() && k.getValue() && !skipVals); - if (addReader) + if (addReader) { e.unswap(false); + // Entry will be removed on touch() if no data in cache, + // but they could be loaded from store, + // we have to add reader again later. + if (readerArgs == null) + readerArgs = new ReaderArguments(reader, msgId, topVer); + } + // Register reader. If there are active transactions for this entry, // then will wait for their completion before proceeding. - // TODO: GG-4003: + // TODO: IGNITE-3498: // TODO: What if any transaction we wait for actually removes this entry? // TODO: In this case seems like we will be stuck with untracked near entry. // TODO: To fix, check that reader is contained in the list of readers once @@ -392,28 +393,19 @@ private IgniteInternalFuture> getAsync( IgniteInternalFuture>> fut; if (txFut == null || txFut.isDone()) { - if (tx == null) { - fut = cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true); - } - else { - fut = tx.getAllAsync(cctx, - null, - keys.keySet(), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + fut = cache().getDhtAllAsync( + keys.keySet(), + readerArgs, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, + /*can remap*/true); } else { + final ReaderArguments args = readerArgs; + // If we are here, then there were active transactions for some entries // when we were adding the reader. In that case we must wait for those // transactions to complete. @@ -424,26 +416,15 @@ private IgniteInternalFuture> getAsync( if (e != null) throw new GridClosureException(e); - if (tx == null) { - return cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true); - } - else { - return tx.getAllAsync(cctx, - null, - keys.keySet(), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + return cache().getDhtAllAsync( + keys.keySet(), + args, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, + /*can remap*/true); } } ); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java index 93949377e12db..49bebd6a74b45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java @@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.ReaderArguments; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; @@ -90,9 +90,6 @@ public final class GridDhtGetSingleFuture extends GridFutureAdapter retries; @@ -115,7 +112,6 @@ public final class GridDhtGetSingleFuture extends GridFutureAdapter>> fut; if (rdrFut == null || rdrFut.isDone()) { - if (tx == null) { - fut = cache().getDhtAllAsync( - Collections.singleton(key), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true); - } - else { - fut = tx.getAllAsync(cctx, - null, - Collections.singleton(key), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false); - } + fut = cache().getDhtAllAsync( + Collections.singleton(key), + readerArgs, + readThrough, + subjId, + taskName, + expiryPlc, + skipVals, + /*can remap*/true); } else { + final ReaderArguments args = readerArgs; + rdrFut.listen( new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture fut) { @@ -381,29 +375,16 @@ private void getAsync() { return; } - IgniteInternalFuture>> fut0; - - if (tx == null) { - fut0 = cache().getDhtAllAsync( + IgniteInternalFuture>> fut0 = + cache().getDhtAllAsync( Collections.singleton(key), + args, readThrough, subjId, taskName, expiryPlc, skipVals, /*can remap*/true); - } - else { - fut0 = tx.getAllAsync(cctx, - null, - Collections.singleton(key), - /*deserialize binary*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough, - false - ); - } fut0.listen(createGetFutureListener()); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index d0f209df94ca0..c8e2cf328bcd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -461,7 +461,8 @@ private boolean localGet(KeyCacheObject key, int part, Map locVals) { null, taskName, expiryPlc, - !deserializeBinary); + !deserializeBinary, + null); if (res != null) { v = res.value(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index e188a3523a3a5..e369bfa83a8be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -385,7 +385,8 @@ private boolean localGet(AffinityTopologyVersion topVer, int part) { null, taskName, expiryPlc, - true); + true, + null); if (res != null) { v = res.value(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 94a049ea86c6e..f601e0a9ddb7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1505,7 +1505,8 @@ private IgniteInternalFuture> getAllAsync0(@Nullable Collection entries) throws Ignite try { GridCacheVersion ver = entry.version(); - entry.versionedValue(ctx.toCacheObject(v), null, ver); + entry.versionedValue(ctx.toCacheObject(v), null, ver, null); } catch (GridCacheEntryRemovedException e) { assert false : "Entry should not get obsolete while holding lock [entry=" + entry + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 56af95e6b65c1..29f0607e5c33c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -496,7 +496,8 @@ public final IgniteInternalFuture> loadAsync( null, taskName, expiryPlc, - !deserializeBinary); + !deserializeBinary, + null); if (res != null) { v = res.value(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index ab0e88cbf375a..8bc513e4dfda2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -449,7 +449,8 @@ private Map map( null, taskName, expiryPlc, - !deserializeBinary); + !deserializeBinary, + null); if (res != null) { v = res.value(); @@ -589,7 +590,8 @@ private boolean localDhtGet(KeyCacheObject key, null, taskName, expiryPlc, - !deserializeBinary); + !deserializeBinary, + null); if (res != null) { v = res.value(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index d1acada062cb7..ad818a63d8c21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -59,7 +59,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.resource.GridResourceIoc; -import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@ -528,7 +527,8 @@ private Map getAllInternal(@Nullable Collection keys, null, taskName, expiry, - !deserializeBinary); + !deserializeBinary, + null); if (res != null) { v = res.value(); @@ -602,6 +602,7 @@ private Map getAllInternal(@Nullable Collection keys, return getAllAsync( keys, + null, opCtx == null || !opCtx.skipStore(), false, subjId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 91c9c92075eab..f05d90df6cf10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -438,7 +438,8 @@ protected boolean commitAfterLock() { null, resolveTaskName(), expiryPlc, - txEntry == null ? keepBinary : txEntry.keepBinary()); + txEntry == null ? keepBinary : txEntry.keepBinary(), + null); if (res == null) { if (misses == null) @@ -477,17 +478,19 @@ protected boolean commitAfterLock() { GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); try { - GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null); - - boolean set = setVer != null; + T2 verVal = entry.versionedValue(cacheVal, + ver, + null, + null); - if (set) - ver = setVer; + if (log.isDebugEnabled()) { + log.debug("Set value loaded from store into entry [" + + "oldVer=" + ver + + ", newVer=" + verVal.get2() + + ", entry=" + entry + ']'); + } - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry [set=" + set + - ", curVer=" + ver + ", newVer=" + setVer + ", " + - "entry=" + entry + ']'); + ver = verVal.get2(); break; } @@ -1232,7 +1235,8 @@ private Collection enlistRead( transformClo, resolveTaskName(), null, - txEntry.keepBinary()); + txEntry.keepBinary(), + null); if (res != null) { val = res.value(); @@ -1316,7 +1320,8 @@ private Collection enlistRead( null, resolveTaskName(), accessPlc, - !deserializeBinary) : null; + !deserializeBinary, + null) : null; if (res != null) { val = res.value(); @@ -1666,7 +1671,8 @@ private IgniteInternalFuture> checkMissed( transformClo, resolveTaskName(), null, - txEntry.keepBinary()); + txEntry.keepBinary(), + null); if (res != null) { val = res.value(); @@ -2390,7 +2396,8 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, entryProcessor, resolveTaskName(), null, - keepBinary) : null; + keepBinary, + null) : null; if (res != null) { old = res.value(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index b03e9c89c5b3b..8db68b4ef50fb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -430,7 +430,8 @@ void recheckLock() { UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) throws IgniteCheckedException, GridCacheEntryRemovedException { + boolean keepBinary, + @Nullable ReaderArguments args) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; return null; @@ -448,7 +449,8 @@ void recheckLock() { Object transformClo, String taskName, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean keepBinary) { + boolean keepBinary, + @Nullable ReaderArguments readerArgs) { assert false; return null; @@ -684,9 +686,10 @@ void recheckLock() { } /** @inheritDoc */ - @Override public GridCacheVersion versionedValue(CacheObject val, + @Override public T2 versionedValue(CacheObject val, GridCacheVersion curVer, - GridCacheVersion newVer) { + GridCacheVersion newVer, + @Nullable ReaderArguments readerArgs) { assert false; return null; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java new file mode 100644 index 0000000000000..183b9caab4d74 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheStoreUpdateTest.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +/** + * Check that near cache is updated when entry loaded from store. + */ +public class GridNearCacheStoreUpdateTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private Ignite srv; + + /** */ + private Ignite client; + + /** */ + private IgniteCache cache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.contains("client")) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + srv = startGrid("server"); + client = startGrid("client"); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If fail. + */ + public void testAtomicUpdateNear() throws Exception { + cache = client.createCache(cacheConfiguration(), new NearCacheConfiguration()); + + checkNear(null, null); + } + + /** + * @throws Exception If fail. + */ + public void testTransactionAtomicUpdateNear() throws Exception { + cache = client.createCache(cacheConfiguration(), new NearCacheConfiguration()); + + checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); + } + + /** + * @throws Exception If fail. + */ + public void testPessimisticRepeatableReadUpdateNear() throws Exception { + cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL), + new NearCacheConfiguration()); + + checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ); + } + + /** + * @throws Exception If fail. + */ + public void testPessimisticReadCommittedUpdateNear() throws Exception { + cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL), + new NearCacheConfiguration()); + + checkNear(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED); + } + + /** + * @throws Exception If fail. + */ + public void testOptimisticSerializableUpdateNear() throws Exception { + cache = client.createCache(cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL), + new NearCacheConfiguration()); + + checkNear(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE); + } + + /** + * @param txConc Transaction concurrency. + * @param txIsolation Transaction isolation. + * @throws Exception If fail. + */ + private void checkNear(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception { + checkNearSingle(txConc, txIsolation); + checkNearSingleConcurrent(txConc, txIsolation); + checkNearBatch(txConc, txIsolation); + checkNearBatchConcurrent(txConc, txIsolation); + } + + /** + * @param txConc Transaction concurrency. + * @param txIsolation Transaction isolation. + * @throws Exception If fail. + */ + private void checkNearSingle(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception { + final String key = "key"; + + boolean tx = txConc != null && txIsolation != null; + + final IgniteCache clientCache = this.cache; + final IgniteCache srvCache = srv.cache(CACHE_NAME); + + if (tx) { + doInTransaction(client, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + // Read from store. + assertEquals(key, clientCache.get(key)); + + return null; + } + }); + } + else + assertEquals(key, clientCache.get(key)); + + final String updatedVal = "key_updated"; + + if (tx) { + doInTransaction(srv, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + // Update value. + srvCache.put(key, updatedVal); + + return null; + } + }); + } + else + srvCache.put(key, updatedVal); + + if (tx) { + doInTransaction(client, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + assertEquals(updatedVal, clientCache.get(key)); + + return null; + } + }); + } + else + assertEquals(updatedVal, clientCache.get(key)); + } + + /** + * @param txConc Transaction concurrency. + * @param txIsolation Transaction isolation. + * @throws Exception If fail. + */ + private void checkNearSingleConcurrent(final TransactionConcurrency txConc, final TransactionIsolation txIsolation) throws Exception { + for (int i = 0; i < 10; i++) { + final String key = String.valueOf(-((new Random().nextInt(99) + 1))); + + boolean tx = txConc != null && txIsolation != null; + + final IgniteCache clientCache = this.cache; + final IgniteCache srvCache = srv.cache(CACHE_NAME); + + final CountDownLatch storeLatch = new CountDownLatch(1); + + final IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + storeLatch.await(); + + clientCache.get(key); + + return null; + } + }); + + +// IgniteInternalFuture fut2 = null; + + // TODO Sometimes Near cache becomes inconsistent +// if (!tx) { +// // TODO: IGNITE-3498 +// // TODO: Doesn't work on transactional cache. +// fut2 = GridTestUtils.runAsync(new Callable() { +// @Override public Object call() throws Exception { +// storeLatch.await(); +// +// srvCache.remove(key); +// +// return null; +// } +// }); +// } + + final IgniteInternalFuture fut3 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + storeLatch.await(); + + srvCache.put(key, "other"); + + return null; + } + }); + + storeLatch.countDown(); + + fut1.get(); + +// if (!tx) +// fut2.get(); + + fut3.get(); + + final String srvVal = srvCache.get(key); + final String clientVal = clientCache.get(key); + + assertEquals(srvVal, clientVal); + } + } + + /** + * @param txConc Transaction concurrency. + * @param txIsolation Transaction isolation. + * @throws Exception If fail. + */ + private void checkNearBatch(TransactionConcurrency txConc, TransactionIsolation txIsolation) throws Exception { + final Map data1 = new HashMap<>(); + final Map data2 = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + data1.put(String.valueOf(i), String.valueOf(i)); + data2.put(String.valueOf(i), "other"); + } + + final IgniteCache clientCache = this.cache; + final IgniteCache srvCache = srv.cache(CACHE_NAME); + + boolean tx = txConc != null && txIsolation != null; + + // Read from store. + if (tx) { + doInTransaction(client, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + assertEquals(data1, clientCache.getAll(data1.keySet())); + + return null; + } + }); + } + else + assertEquals(data1, clientCache.getAll(data1.keySet())); + + // Update value. + if (tx) { + doInTransaction(srv, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + srvCache.putAll(data2); + + return null; + } + }); + } + else + srvCache.putAll(data2); + + if (tx) { + doInTransaction(client, txConc, txIsolation, new Callable() { + @Override public Object call() throws Exception { + assertEquals(data2, clientCache.getAll(data2.keySet())); + + return null; + } + }); + } + else + assertEquals(data2, clientCache.getAll(data2.keySet())); + } + + /** + * @param txConc Transaction concurrency. + * @param txIsolation Transaction isolation. + * @throws Exception If fail. + */ + private void checkNearBatchConcurrent(TransactionConcurrency txConc, TransactionIsolation txIsolation) + throws Exception { + final Map data1 = new HashMap<>(); + final Map data2 = new HashMap<>(); + + for (int j = 0; j < 10; j++) { + data1.clear(); + data2.clear(); + + for (int i = j * 10; i < j * 10 + 10; i++) { + data1.put(String.valueOf(i), String.valueOf(i)); + data2.put(String.valueOf(i), "other"); + } + + final IgniteCache clientCache = this.cache; + final IgniteCache srvCache = srv.cache(CACHE_NAME); + + boolean tx = txConc != null && txIsolation != null; + + final CountDownLatch latch = new CountDownLatch(1); + + final IgniteInternalFuture fut1 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + latch.await(); + + clientCache.getAll(data1.keySet()); + + return null; + } + }); + + IgniteInternalFuture fut2 = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + latch.await(); + + srvCache.putAll(data2); + + return null; + } + }); + +// IgniteInternalFuture fut3 = null; +// +// // TODO Sometimes Near cache becomes inconsistent +// if (!tx) { +// // TODO: IGNITE-3498 +// // TODO: Doesn't work on transactional cache. +// fut3 = GridTestUtils.runAsync(new Callable() { +// @Override public Object call() throws Exception { +// latch.await(); +// +// srvCache.removeAll(data1.keySet()); +// +// return null; +// } +// }); +// } + + latch.countDown(); + +// if (!tx) +// fut3.get(); + + fut1.get(); + fut2.get(); + + final Map srvVals = srvCache.getAll(data1.keySet()); + final Map clientVals = clientCache.getAll(data1.keySet()); + + assertEquals(srvVals, clientVals); + } + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration<>(CACHE_NAME); + + cfg.setCacheStoreFactory(new StoreFactory()); + + cfg.setReadThrough(true); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + return cfg; + } + + /** + * + */ + private static class StoreFactory implements Factory> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestStore(); + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter implements Serializable { + /** */ + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public TestStore() { + for (int i = -100; i < 1000; i++) + map.put(String.valueOf(i), String.valueOf(i)); + + map.put("key", "key"); + } + + /** {@inheritDoc} */ + @Override public String load(String key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @SuppressWarnings("SuspiciousMethodCalls") + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java new file mode 100644 index 0000000000000..ae3f695d0a443 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffheapCacheStoreUpdateTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * Check that near cache is updated when entry loaded from store. + */ +public class GridNearOffheapCacheStoreUpdateTest extends GridNearCacheStoreUpdateTest { + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + final CacheConfiguration ccfg = super.cacheConfiguration(); + + ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED); + + return ccfg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 8792ea16ac9af..af46c5783cfc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -114,8 +114,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePartitionedTxTimeoutSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheRendezvousAffinityClientSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridPartitionedBackupLoadSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheStoreUpdateTest; import org.apache.ignite.internal.processors.cache.distributed.near.NearCacheSyncUpdateTest; import org.apache.ignite.internal.processors.cache.distributed.near.NoneRebalanceModeSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOffheapCacheStoreUpdateTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedEvictionSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedJobExecutionTest; import org.apache.ignite.internal.processors.cache.local.GridCacheLocalAtomicBasicStoreSelfTest; @@ -271,6 +273,9 @@ public static TestSuite suite() throws Exception { suite.addTest(new TestSuite(OffheapCacheOnClientsTest.class)); suite.addTest(new TestSuite(CacheConcurrentReadThroughTest.class)); + suite.addTest(new TestSuite(GridNearCacheStoreUpdateTest.class)); + suite.addTest(new TestSuite(GridNearOffheapCacheStoreUpdateTest.class)); + return suite; } }