From 151a2e52285c1370c4202095eaf7f70b97ff1cad Mon Sep 17 00:00:00 2001 From: Valentin Kulichenko Date: Fri, 20 Mar 2015 19:26:52 -0700 Subject: [PATCH] IGNITE-543 - Query API changes --- .../ignite/cache/query/ContinuousQuery.java | 47 ++++--------------- .../org/apache/ignite/cache/query/Query.java | 3 ++ .../processors/cache/IgniteCacheProxy.java | 2 +- .../CacheContinuousQueryManager.java | 6 +-- ...dCacheContinuousQueryAbstractSelfTest.java | 13 +++-- .../GridContinuousOperationsLoadTest.java | 2 +- 6 files changed, 24 insertions(+), 49 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index 98149959320ba..cadcb9cc513f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -109,10 +109,10 @@ public final class ContinuousQuery extends Query> { private static final long serialVersionUID = 0L; /** - * Default buffer size. Size of {@code 1} means that all entries + * Default page size. Size of {@code 1} means that all entries * will be sent to master node immediately (buffering is disabled). */ - public static final int DFLT_BUF_SIZE = 1; + public static final int DFLT_PAGE_SIZE = 1; /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ public static final long DFLT_TIME_INTERVAL = 0; @@ -132,15 +132,19 @@ public final class ContinuousQuery extends Query> { /** Remote filter. */ private CacheEntryEventFilter rmtFilter; - /** Buffer size. */ - private int bufSize = DFLT_BUF_SIZE; - /** Time interval. */ private long timeInterval = DFLT_TIME_INTERVAL; /** Automatic unsubscription flag. */ private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; + /** + * Creates new continuous query. + */ + public ContinuousQuery() { + setPageSize(DFLT_PAGE_SIZE); + } + /** * Sets initial query. *

@@ -221,42 +225,11 @@ public CacheEntryEventFilter getRemoteFilter() { return rmtFilter; } - /** - * Sets buffer size. - *

- * When a cache update happens, entry is first put into a buffer. Entries from buffer will be - * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is - * exceeded. - *

- * Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is - * disabled). - * - * @param bufSize Buffer size. - * @return {@code this} for chaining. - */ - public ContinuousQuery setBufferSize(int bufSize) { - if (bufSize <= 0) - throw new IllegalArgumentException("Buffer size must be above zero."); - - this.bufSize = bufSize; - - return this; - } - - /** - * Gets buffer size. - * - * @return Buffer size. - */ - public int getBufferSize() { - return bufSize; - } - /** * Sets time interval. *

* When a cache update happens, entry is first put into a buffer. Entries from buffer will - * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} + * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)} * method) or time provided via this method is exceeded. *

* Default time interval is {@code 0} which means that diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java index c120fc5d9b028..bcace6b10d053 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -65,6 +65,9 @@ public int getPageSize() { * @return {@code this} for chaining. */ public Query setPageSize(int pageSize) { + if (pageSize <= 0) + throw new IllegalArgumentException("Page size must be above zero."); + this.pageSize = pageSize; return this; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f55de0e6d9bba..22bb3302988a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -365,7 +365,7 @@ private QueryCursor> queryContinuous(ContinuousQuery qry, boolean lo final UUID routineId = ctx.continuousQueries().executeQuery( qry.getLocalListener(), qry.getRemoteFilter(), - qry.getBufferSize(), + qry.getPageSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), loc ? ctx.grid().cluster().forLocal() : null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index eb0fc1de7db72..39a8959501b41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -284,7 +284,7 @@ public UUID executeInternalQuery(CacheEntryUpdatedListener locLsnr, return executeQuery0( locLsnr, rmtFilter, - ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, true, @@ -401,7 +401,7 @@ else if (nodes.size() > 1) int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? cctx.kernalContext().job().currentTaskNameHash() : 0; - GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + GridContinuousHandler hnd = new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), locLsnr, @@ -582,7 +582,7 @@ void execute() throws IgniteCheckedException { routineId = executeQuery0( locLsnr, rmtFilter, - ContinuousQuery.DFLT_BUF_SIZE, + ContinuousQuery.DFLT_PAGE_SIZE, ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, false, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index cbf9eb151fd3e..096ea977906d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -170,7 +170,7 @@ public boolean apply() { for (int i = 0; i < gridCount(); i++) { - GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous(); + GridContinuousProcessor proc = grid(i).context().continuous(); assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); @@ -180,8 +180,7 @@ public boolean apply() { assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); - CacheContinuousQueryManager mgr = - ((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries(); + CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries(); assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size()); } @@ -215,7 +214,7 @@ public void testIllegalArguments() throws Exception { log, new Callable() { @Override public Object call() throws Exception { - q.setBufferSize(-1); + q.setPageSize(-1); return null; } @@ -226,7 +225,7 @@ public void testIllegalArguments() throws Exception { GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { - q.setBufferSize(0); + q.setPageSize(0); return null; } @@ -514,7 +513,7 @@ public void testBuffering() throws Exception { } }); - qry.setBufferSize(5); + qry.setPageSize(5); try (QueryCursor> ignored = cache.query(qry)) { ClusterNode node = F.first(grid(0).cluster().forRemotes().nodes()); @@ -599,7 +598,7 @@ public void testTimeInterval() throws Exception { } }); - qry.setBufferSize(10); + qry.setPageSize(10); qry.setTimeInterval(3000); try (QueryCursor> ignored = cache.query(qry)) { diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java index fb0c2d8d2297d..2ab72f7fe1f20 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/continuous/GridContinuousOperationsLoadTest.java @@ -128,7 +128,7 @@ public static void main(String[] args) throws Exception { } }); - qry.setBufferSize(bufSize); + qry.setPageSize(bufSize); qry.setTimeInterval(timeInterval); cache.query(qry);