From 99f0f777ae1507994b15283937629f12aabca4ac Mon Sep 17 00:00:00 2001 From: voipp Date: Thu, 14 Sep 2017 10:38:42 +0300 Subject: [PATCH 1/2] ignite-5960 additions1 --- .../processors/cache/GridCacheMapEntry.java | 37 ++++++++ ...ousQueryConcurrentPartitionUpdateTest.java | 95 +++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 61f6fb47518d0..4ef78edcd1250 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; @@ -96,6 +98,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** */ private static final byte IS_DELETED_MASK = 0x01; + private static final Object mux = new Object(); + /** */ private static final byte IS_UNSWAPPED_MASK = 0x02; @@ -1652,6 +1656,24 @@ else if (ttl != CU.TTL_ZERO) Map lsnrs = cctx.continuousQueries().updateListeners(internal, false); + if(key.value(cctx.cacheObjectContext(), false).equals(1)) { + assert lsnrs == null; + try { + synchronized (mux) { + + mux.wait(); + + } + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + if(key.value(cctx.cacheObjectContext(), false).equals(2)){ + assert lsnrs != null; + } + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1691,6 +1713,21 @@ else if (ttl != CU.TTL_ZERO) GridCacheUpdateAtomicResult updateRes = c.updateRes; + if(key.value(cctx.cacheObjectContext(), false).equals(2)){ + assert lsnrs != null; + assert c.updateRes.updateCounter() == 1; + synchronized (mux) { + + mux.notify(); + } + } + + if(key.value(cctx.cacheObjectContext(), false).equals(1)){ + assert lsnrs == null; + assert c.updateRes.updateCounter() == 2; + //mux.notify(); + } + assert updateRes != null : c; CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java index 6c74f7901cd8e..9d095564634f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java @@ -18,23 +18,50 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheInterceptorAdapter; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -42,6 +69,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -54,6 +82,8 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo /** */ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + private AtomicInteger cntr = new AtomicInteger(0); + /** */ private boolean client; @@ -65,6 +95,9 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo cfg.setClientMode(client); + cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, + cntr.getAndIncrement())); + return cfg; } @@ -257,6 +290,68 @@ public void _testConcurrentUpdatesAndQueryStartTxCacheGroup() throws Exception { concurrentUpdatesAndQueryStart(TRANSACTIONAL, true); } + public void test1() throws Exception { + Ignite srv = startGrid(0); + + client = true; + + Ignite client = startGrid(1); + + List caches = new ArrayList<>(); + + CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); + + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(ATOMIC); + + ccfg.setAffinity(new GridCacheModuloAffinityFunction(1, ccfg.getBackups())); + + IgniteCache clientCache = client.createCache(ccfg); + IgniteCache serverCache = srv.cache(DEFAULT_CACHE_NAME); + + Affinity aff = srv.affinity(DEFAULT_CACHE_NAME); + + final List keys = new ArrayList<>(); + + assert aff.partition(1) == 0; + assert aff.partition(2) == 0; + + multithreadedAsync(new Runnable() { + @Override public void run() { + + serverCache.put(1, 1); + } + }, 1); + + Thread.sleep(1000); + + ContinuousQuery qry = new ContinuousQuery<>(); + Set keysReceived = new HashSet<>(); + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) { + for (CacheEntryEvent evt : evts) + keysReceived.add((Integer)evt.getKey()); + } + }); + + clientCache.query(qry); + + serverCache.put(2, 1); + + serverCache.put(3, 1); + + Thread.sleep(1000); + + for (Integer integer : keysReceived) + System.out.println("Lister received key: " + integer + " "); + } + + private static class TestInterceptor extends CacheInterceptorAdapter { + @Nullable @Override public Object onBeforePut(Cache.Entry entry, Object newVal) { + return super.onBeforePut(entry, newVal); + } + } + /** * @param atomicityMode Cache atomicity mode. * @param cacheGrp {@code True} if test cache multiple caches in the same group. From 35deef8a735ecf0bd04e0f42fa54bb998ffbeddb Mon Sep 17 00:00:00 2001 From: voipp Date: Thu, 14 Sep 2017 16:37:53 +0300 Subject: [PATCH 2/2] ignite-5960 probable fix --- .../processors/cache/GridCacheMapEntry.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4ef78edcd1250..d94dd69b460c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1657,7 +1657,7 @@ else if (ttl != CU.TTL_ZERO) Map lsnrs = cctx.continuousQueries().updateListeners(internal, false); if(key.value(cctx.cacheObjectContext(), false).equals(1)) { - assert lsnrs == null; + //assert lsnrs == null; try { synchronized (mux) { @@ -1671,7 +1671,7 @@ else if (ttl != CU.TTL_ZERO) } if(key.value(cctx.cacheObjectContext(), false).equals(2)){ - assert lsnrs != null; + //assert lsnrs != null; } boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM @@ -1711,11 +1711,20 @@ else if (ttl != CU.TTL_ZERO) else cctx.offheap().invoke(cctx, key, localPartition(), c); + if(lsnrs == null && !readThrough) { + lsnrs = cctx.continuousQueries().updateListeners(internal, false); + + if (lsnrs != null) { + needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM + || !F.isEmptyOrNulls(filter); + } + } + GridCacheUpdateAtomicResult updateRes = c.updateRes; if(key.value(cctx.cacheObjectContext(), false).equals(2)){ - assert lsnrs != null; - assert c.updateRes.updateCounter() == 1; + //assert lsnrs != null; + //assert c.updateRes.updateCounter() == 1; synchronized (mux) { mux.notify(); @@ -1723,8 +1732,8 @@ else if (ttl != CU.TTL_ZERO) } if(key.value(cctx.cacheObjectContext(), false).equals(1)){ - assert lsnrs == null; - assert c.updateRes.updateCounter() == 2; + //assert lsnrs == null; + //assert c.updateRes.updateCounter() == 2; //mux.notify(); }