From ebf354c568d0802b7eed1cc6b9d251941dbce014 Mon Sep 17 00:00:00 2001 From: nikolay_tikhonov Date: Fri, 16 Sep 2016 14:32:13 +0300 Subject: [PATCH 1/3] IGNITE-3907 Fixed "Incorrect initialization CQ when node filter configured for cache" --- .../internal/GridEventConsumeHandler.java | 5 - .../internal/GridMessageListenHandler.java | 5 - .../CacheContinuousQueryHandler.java | 5 - .../continuous/GridContinuousHandler.java | 8 - .../continuous/GridContinuousProcessor.java | 33 +--- ...ontinuousQueryMultiNodesFilteringTest.java | 161 ++++++++++++++++++ 6 files changed, 170 insertions(+), 47 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index b4b1e58b93727..ed6998d5b8ab1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -261,11 +261,6 @@ public GridEventConsumeHandler() { return RegisterStatus.REGISTERED; } - /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { assert routineId != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 2b8041d5a7ab1..1bca85cacfd7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -138,11 +138,6 @@ public GridMessageListenHandler(GridMessageListenHandler orig) { return RegisterStatus.REGISTERED; } - /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { ctx.io().removeUserMessageListener(topic, pred); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 7b3b47b5a9dfb..a5752ed47eb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -563,11 +563,6 @@ public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedExcept } } - /** {@inheritDoc} */ - @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { - // No-op. - } - /** {@inheritDoc} */ @Override public void unregister(UUID routineId, GridKernalContext ctx) { assert routineId != null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index c90746d759e6a..f14b450303f2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -56,14 +56,6 @@ public enum RegisterStatus { */ public RegisterStatus register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; - /** - * Callback called after listener is registered and acknowledgement is sent. - * - * @param routineId Routine ID. - * @param ctx Kernal context. - */ - public void onListenerRegistered(UUID routineId, GridKernalContext ctx); - /** * Unregisters listener. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 5f61051b984be..ad7ad4fd70db8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -478,11 +478,9 @@ public void unlockStopping() { // Register handler only if local node passes projection predicate. if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) && - !locInfos.containsKey(item.routineId)) { - if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, - item.autoUnsubscribe, false)) - item.hnd.onListenerRegistered(item.routineId, ctx); - } + !locInfos.containsKey(item.routineId)) + registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval, + item.autoUnsubscribe, false); if (!item.autoUnsubscribe) // Register routine locally. @@ -509,14 +507,13 @@ public void unlockStopping() { ctx.resource().injectGeneric(info.prjPred); if (info.prjPred == null || info.prjPred.apply(ctx.discovery().localNode())) { - if (registerHandler(clientNodeId, + registerHandler(clientNodeId, routineId, info.hnd, info.bufSize, info.interval, info.autoUnsubscribe, - false)) - info.hnd.onListenerRegistered(routineId, ctx); + false); } } catch (IgniteCheckedException err) { @@ -555,9 +552,6 @@ public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { GridContinuousHandler.RegisterStatus status = hnd.register(rmtInfo.nodeId, routineId, this.ctx); assert status != GridContinuousHandler.RegisterStatus.DELAYED; - - if (status == GridContinuousHandler.RegisterStatus.REGISTERED) - hnd.onListenerRegistered(routineId, this.ctx); } } } @@ -649,8 +643,6 @@ public IgniteInternalFuture startRoutine(GridContinuousHandler hnd, try { registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); - hnd.onListenerRegistered(routineId, ctx); - return new GridFinishedFuture<>(routineId); } catch (IgniteCheckedException e) { @@ -700,9 +692,8 @@ public IgniteInternalFuture startRoutine(GridContinuousHandler hnd, startFuts.put(routineId, fut); try { - if (locIncluded - && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true)) - hnd.onListenerRegistered(routineId, ctx); + if (locIncluded || hnd.isQuery()) + registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, reqData.handler().keepBinary())); @@ -1020,8 +1011,6 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage data.autoUnsubscribe())); } - boolean registered = false; - if (err == null) { try { IgnitePredicate prjPred = data.projectionPredicate(); @@ -1030,10 +1019,9 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage ctx.resource().injectGeneric(prjPred); if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && - !locInfos.containsKey(routineId)) { - registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), + !locInfos.containsKey(routineId)) + registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe(), false); - } if (!data.autoUnsubscribe()) // Register routine locally. @@ -1061,9 +1049,6 @@ private void processStartRequest(ClusterNode node, StartRoutineDiscoveryMessage if (err != null) req.addError(ctx.localNodeId(), err); - - if (registered) - hnd0.onListenerRegistered(routineId, ctx); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java index 700044659880a..cf0c0d9575441 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java @@ -17,9 +17,17 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import javax.cache.configuration.Factory; import javax.cache.configuration.MutableCacheEntryListenerConfiguration; import javax.cache.event.CacheEntryCreatedListener; @@ -33,9 +41,12 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -45,8 +56,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** */ @SuppressWarnings("unchecked") @@ -57,13 +70,21 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA /** */ private static final int SERVER_GRIDS_COUNT = 6; + /** */ + public static final int KEYS = 2_000; + /** Cache entry operations' counts. */ private static final ConcurrentMap opCounts = new ConcurrentHashMap8<>(); + /** Client. */ + private static boolean client = false; + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); + client = false; + super.afterTest(); } @@ -122,6 +143,108 @@ public void testFiltersAndListeners() throws Exception { } } + /** + * @throws Exception If failed. + */ + public void testWithNodeFilter() throws Exception { + List qryCursors = new ArrayList<>(); + + final int nodesCnt = 3; + + startGridsMultiThreaded(nodesCnt); + + awaitPartitionMapExchange(); + + CacheConfiguration ccfg = cacheConfiguration(new NodeFilterByRegexp(".*(0|1)$")); + + grid(0).createCache(ccfg); + + final AtomicInteger cntr = new AtomicInteger(); + + final ConcurrentMap> maps = new ConcurrentHashMap<>(); + + final AtomicBoolean doubleNtfFail = new AtomicBoolean(false); + + CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent e : evts) { + cntr.incrementAndGet(); + + ClusterNode node = ((Ignite)e.getSource().unwrap(Ignite.class)).cluster().localNode(); + + Set set = maps.get(node); + + if (set == null) { + set = new ConcurrentSkipListSet<>(); + + Set oldVal = maps.putIfAbsent(node, set); + + set = oldVal != null ? oldVal : set; + } + + if (!set.add(e.getValue())) + doubleNtfFail.set(false); + } + } + }; + + for (int i = 0; i < nodesCnt; i++) { + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + Ignite ignite = grid(i); + + log.info("Try to start CQ on node: " + ignite.cluster().localNode().id()); + + qryCursors.add(ignite.cache(ccfg.getName()).query(qry)); + + log.info("CQ started on node: " + ignite.cluster().localNode().id()); + } + + client = true; + + startGrid(nodesCnt); + + awaitPartitionMapExchange(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qryCursors.add(grid(nodesCnt).cache(ccfg.getName()).query(qry)); + + for (int i = 0; i <= nodesCnt; i++) { + for (int key = 0; key < KEYS; key++) { + int val = (i * KEYS) + key; + + grid(i).cache(ccfg.getName()).put(val, val); + } + } + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cntr.get() >= 2 * (nodesCnt + 1) * KEYS; + } + }, 5000L)); + + assertFalse("Got duplicate", doubleNtfFail.get()); + + for (int i = 0; i < (nodesCnt + 1) * KEYS; i++) { + for (Map.Entry> e : maps.entrySet()) + assertTrue("Lost event on node: " + e.getKey().id() + ", event: " + i, e.getValue().remove(i)); + } + + for (Map.Entry> e : maps.entrySet()) + assertTrue("Unexpected event on node: " + e.getKey(), e.getValue().isEmpty()); + + assertEquals("Not expected count of CQ", nodesCnt + 1, qryCursors.size()); + + for (QueryCursor cur : qryCursors) + cur.close(); + } + /** */ private Ignite startGrid(final int idx, boolean isClientMode) throws Exception { String gridName = getTestGridName(idx); @@ -179,6 +302,28 @@ private Ignite startGrid(final int idx, boolean isClientMode) throws Exception { return node; } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param filter Node filter. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(NodeFilterByRegexp filter) { + return new CacheConfiguration("test-cache-cq") + .setBackups(1) + .setNodeFilter(filter) + .setAtomicityMode(ATOMIC) + .setWriteSynchronizationMode(FULL_SYNC) + .setCacheMode(PARTITIONED); + } + /** */ private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration { /** Operation. */ @@ -275,4 +420,20 @@ private NodeFilter(int idx) { return ((Integer)clusterNode.attributes().get("idx") % 2) == idx % 2; } } + + /** */ + private final static class NodeFilterByRegexp implements IgnitePredicate { + /** */ + private final Pattern pattern; + + /** */ + private NodeFilterByRegexp(String regExp) { + this.pattern = Pattern.compile(regExp); + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode clusterNode) { + return pattern.matcher(clusterNode.id().toString()).matches(); + } + } } From 13b4c74ba15e1aa6a8845ea168358321a41cd630 Mon Sep 17 00:00:00 2001 From: iveselovskiy Date: Fri, 16 Sep 2016 20:56:30 +0300 Subject: [PATCH 2/3] IGNITE-3922 : POC-like fix. --- .../igfs/IgfsBlockLocationImpl.java | 61 ++++++++++++++----- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java index 2d4a0af05460d..1b53ae169b11b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java @@ -25,7 +25,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryRawReader; @@ -39,6 +41,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jsr166.ConcurrentHashMap8; /** * File block location in the grid. @@ -283,22 +286,12 @@ public IgfsBlockLocationImpl(long start, long len, Collection nodes hosts.add(rawReader.readString()); } - /** - * Converts collection of rich nodes to block location data. - * - * @param nodes Collection of affinity nodes. - */ - private void convertFromNodes(Collection nodes) { - Collection names = new LinkedHashSet<>(); - Collection hosts = new LinkedHashSet<>(); - Collection nodeIds = new ArrayList<>(nodes.size()); - - for (final ClusterNode node : nodes) { - // Normalize host names into Hadoop-expected format. + static class A { + A(ClusterNode node) { try { - Collection addrs = U.toInetAddresses(node); + Collection inetAddresses = U.toInetAddresses(node); - for (InetAddress addr : addrs) { + for (InetAddress addr : inetAddresses) { if (addr.getHostName() == null) names.add(addr.getHostAddress() + ":" + 9001); else { @@ -310,8 +303,48 @@ private void convertFromNodes(Collection nodes) { catch (IgniteCheckedException ignored) { names.addAll(node.addresses()); } + } + final Collection names = new LinkedHashSet<>(); + final Collection hosts = new LinkedHashSet<>(); + } + + private static final ConcurrentMap addrMap = new ConcurrentHashMap8<>(); + + private static A getAddr(ClusterNode n) { + A a = addrMap.get(n.id()); + + if (a != null) + return a; + + A aNew = new A(n); + A a2 = addrMap.putIfAbsent(n.id(), aNew); + + if (a2 == null) + return aNew; + else + return a2; + } + + /** + * Converts collection of rich nodes to block location data. + * + * @param nodes Collection of affinity nodes. + */ + private void convertFromNodes(Collection nodes) { + Collection names = new LinkedHashSet<>(); + Collection hosts = new LinkedHashSet<>(); + Collection nodeIds = new ArrayList<>(nodes.size()); + + for (final ClusterNode node : nodes) { nodeIds.add(node.id()); + + A a = getAddr(node); + + assert a != null; + + names.addAll(a.names); + hosts.addAll(a.hosts); } this.nodeIds = nodeIds; From d16f44adc10f1b6c2fc81ca9f86f17f229c7ee6d Mon Sep 17 00:00:00 2001 From: iveselovskiy Date: Mon, 19 Sep 2016 15:37:35 +0300 Subject: [PATCH 3/3] 3922: butifications (wip) --- .../igfs/IgfsBlockLocationImpl.java | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java index 1b53ae169b11b..0009fc1bcf65b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; @@ -286,17 +285,27 @@ public IgfsBlockLocationImpl(long start, long len, Collection nodes hosts.add(rawReader.readString()); } - static class A { - A(ClusterNode node) { + /** + * Encapsulates + */ + static class CachedAddresses { + /** + * Constructor. + * + * @param node The node to calculate and cache addresses for. + */ + CachedAddresses(ClusterNode node) { try { - Collection inetAddresses = U.toInetAddresses(node); + Collection inetAddrs = U.toInetAddresses(node); + + for (InetAddress addr : inetAddrs) { + String hostName = addr.getHostName(); - for (InetAddress addr : inetAddresses) { - if (addr.getHostName() == null) + if (hostName == null) names.add(addr.getHostAddress() + ":" + 9001); else { - names.add(addr.getHostName() + ":" + 9001); // hostname:portNumber - hosts.add(addr.getHostName()); + names.add(hostName + ":" + 9001); // hostname:portNumber + hosts.add(hostName); } } } @@ -304,21 +313,31 @@ static class A { names.addAll(node.addresses()); } } + + /** + * Collection of pairs {@code hostname:portNumber}, + * see {@link IgfsBlockLocation#names()}. + */ final Collection names = new LinkedHashSet<>(); + + /** + * Collection of pairs {@code hostname:portNumber}, + * see {@link IgfsBlockLocation#hosts()}. + */ final Collection hosts = new LinkedHashSet<>(); } - private static final ConcurrentMap addrMap = new ConcurrentHashMap8<>(); + private static final ConcurrentMap addrMap = new ConcurrentHashMap8<>(); - private static A getAddr(ClusterNode n) { - A a = addrMap.get(n.id()); + private static CachedAddresses getAddr(ClusterNode n) { + CachedAddresses a = addrMap.get(n.id()); if (a != null) return a; - A aNew = new A(n); + CachedAddresses aNew = new CachedAddresses(n); - A a2 = addrMap.putIfAbsent(n.id(), aNew); + CachedAddresses a2 = addrMap.putIfAbsent(n.id(), aNew); if (a2 == null) return aNew; @@ -339,7 +358,7 @@ private void convertFromNodes(Collection nodes) { for (final ClusterNode node : nodes) { nodeIds.add(node.id()); - A a = getAddr(node); + CachedAddresses a = getAddr(node); assert a != null;