Skip to content

Commit

Permalink
IGNITE-543 - Query API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed Mar 21, 2015
1 parent dcbab7d commit 151a2e5
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 49 deletions.
Expand Up @@ -109,10 +109,10 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** /**
* Default buffer size. Size of {@code 1} means that all entries * Default page size. Size of {@code 1} means that all entries
* will be sent to master node immediately (buffering is disabled). * will be sent to master node immediately (buffering is disabled).
*/ */
public static final int DFLT_BUF_SIZE = 1; public static final int DFLT_PAGE_SIZE = 1;


/** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */
public static final long DFLT_TIME_INTERVAL = 0; public static final long DFLT_TIME_INTERVAL = 0;
Expand All @@ -132,15 +132,19 @@ public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** Remote filter. */ /** Remote filter. */
private CacheEntryEventFilter<K, V> rmtFilter; private CacheEntryEventFilter<K, V> rmtFilter;


/** Buffer size. */
private int bufSize = DFLT_BUF_SIZE;

/** Time interval. */ /** Time interval. */
private long timeInterval = DFLT_TIME_INTERVAL; private long timeInterval = DFLT_TIME_INTERVAL;


/** Automatic unsubscription flag. */ /** Automatic unsubscription flag. */
private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;


/**
* Creates new continuous query.
*/
public ContinuousQuery() {
setPageSize(DFLT_PAGE_SIZE);
}

/** /**
* Sets initial query. * Sets initial query.
* <p> * <p>
Expand Down Expand Up @@ -221,42 +225,11 @@ public CacheEntryEventFilter<K, V> getRemoteFilter() {
return rmtFilter; return rmtFilter;
} }


/**
* Sets buffer size.
* <p>
* When a cache update happens, entry is first put into a buffer. Entries from buffer will be
* sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is
* exceeded.
* <p>
* Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is
* disabled).
*
* @param bufSize Buffer size.
* @return {@code this} for chaining.
*/
public ContinuousQuery<K, V> setBufferSize(int bufSize) {
if (bufSize <= 0)
throw new IllegalArgumentException("Buffer size must be above zero.");

this.bufSize = bufSize;

return this;
}

/**
* Gets buffer size.
*
* @return Buffer size.
*/
public int getBufferSize() {
return bufSize;
}

/** /**
* Sets time interval. * Sets time interval.
* <p> * <p>
* When a cache update happens, entry is first put into a buffer. Entries from buffer will * When a cache update happens, entry is first put into a buffer. Entries from buffer will
* be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} * be sent to the master node only if the buffer is full (its size can be provided via {@link #setPageSize(int)}
* method) or time provided via this method is exceeded. * method) or time provided via this method is exceeded.
* <p> * <p>
* Default time interval is {@code 0} which means that * Default time interval is {@code 0} which means that
Expand Down
Expand Up @@ -65,6 +65,9 @@ public int getPageSize() {
* @return {@code this} for chaining. * @return {@code this} for chaining.
*/ */
public Query<R> setPageSize(int pageSize) { public Query<R> setPageSize(int pageSize) {
if (pageSize <= 0)
throw new IllegalArgumentException("Page size must be above zero.");

this.pageSize = pageSize; this.pageSize = pageSize;


return this; return this;
Expand Down
Expand Up @@ -365,7 +365,7 @@ private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean lo
final UUID routineId = ctx.continuousQueries().executeQuery( final UUID routineId = ctx.continuousQueries().executeQuery(
qry.getLocalListener(), qry.getLocalListener(),
qry.getRemoteFilter(), qry.getRemoteFilter(),
qry.getBufferSize(), qry.getPageSize(),
qry.getTimeInterval(), qry.getTimeInterval(),
qry.isAutoUnsubscribe(), qry.isAutoUnsubscribe(),
loc ? ctx.grid().cluster().forLocal() : null); loc ? ctx.grid().cluster().forLocal() : null);
Expand Down
Expand Up @@ -284,7 +284,7 @@ public UUID executeInternalQuery(CacheEntryUpdatedListener<?, ?> locLsnr,
return executeQuery0( return executeQuery0(
locLsnr, locLsnr,
rmtFilter, rmtFilter,
ContinuousQuery.DFLT_BUF_SIZE, ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
true, true,
Expand Down Expand Up @@ -401,7 +401,7 @@ else if (nodes.size() > 1)
int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? int taskNameHash = !internal && cctx.kernalContext().security().enabled() ?
cctx.kernalContext().job().currentTaskNameHash() : 0; cctx.kernalContext().job().currentTaskNameHash() : 0;


GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( GridContinuousHandler hnd = new CacheContinuousQueryHandler(
cctx.name(), cctx.name(),
TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()),
locLsnr, locLsnr,
Expand Down Expand Up @@ -582,7 +582,7 @@ void execute() throws IgniteCheckedException {
routineId = executeQuery0( routineId = executeQuery0(
locLsnr, locLsnr,
rmtFilter, rmtFilter,
ContinuousQuery.DFLT_BUF_SIZE, ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL, ContinuousQuery.DFLT_TIME_INTERVAL,
ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE, ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE,
false, false,
Expand Down
Expand Up @@ -170,7 +170,7 @@ public boolean apply() {




for (int i = 0; i < gridCount(); i++) { for (int i = 0; i < gridCount(); i++) {
GridContinuousProcessor proc = ((IgniteKernal)grid(i)).context().continuous(); GridContinuousProcessor proc = grid(i).context().continuous();


assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size()); assertEquals(String.valueOf(i), 2, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
Expand All @@ -180,8 +180,7 @@ public boolean apply() {
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size()); assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size());


CacheContinuousQueryManager mgr = CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries();
((IgniteKernal)grid(i)).context().cache().internalCache().context().continuousQueries();


assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size()); assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
} }
Expand Down Expand Up @@ -215,7 +214,7 @@ public void testIllegalArguments() throws Exception {
log, log,
new Callable<Object>() { new Callable<Object>() {
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
q.setBufferSize(-1); q.setPageSize(-1);


return null; return null;
} }
Expand All @@ -226,7 +225,7 @@ public void testIllegalArguments() throws Exception {


GridTestUtils.assertThrows(log, new Callable<Object>() { GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception { @Override public Object call() throws Exception {
q.setBufferSize(0); q.setPageSize(0);


return null; return null;
} }
Expand Down Expand Up @@ -514,7 +513,7 @@ public void testBuffering() throws Exception {
} }
}); });


qry.setBufferSize(5); qry.setPageSize(5);


try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
ClusterNode node = F.first(grid(0).cluster().forRemotes().nodes()); ClusterNode node = F.first(grid(0).cluster().forRemotes().nodes());
Expand Down Expand Up @@ -599,7 +598,7 @@ public void testTimeInterval() throws Exception {
} }
}); });


qry.setBufferSize(10); qry.setPageSize(10);
qry.setTimeInterval(3000); qry.setTimeInterval(3000);


try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
Expand Down
Expand Up @@ -128,7 +128,7 @@ public static void main(String[] args) throws Exception {
} }
}); });


qry.setBufferSize(bufSize); qry.setPageSize(bufSize);
qry.setTimeInterval(timeInterval); qry.setTimeInterval(timeInterval);


cache.query(qry); cache.query(qry);
Expand Down

0 comments on commit 151a2e5

Please sign in to comment.