Skip to content

Commit

Permalink
#ignite-335: Change ContinuousQuery.setInitialQuery API.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Feb 25, 2015
1 parent 12ee896 commit 3d4ce6f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception {
// Create new continuous query. // Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();


qry.setInitialPredicate(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() { qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
@Override public boolean apply(Integer key, String val) { @Override public boolean apply(Integer key, String val) {
return key > 10; return key > 10;
} }
Expand Down
Expand Up @@ -29,7 +29,7 @@
* the node that executed the query and local listener will be notified. * the node that executed the query and local listener will be notified.
* <p> * <p>
* Additionally, you can execute initial query to get currently existing data. * Additionally, you can execute initial query to get currently existing data.
* Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialPredicate(Query)} * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
* method. * method.
* <p> * <p>
* Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)} * Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
Expand Down Expand Up @@ -62,7 +62,7 @@
* ContinuousQuery qry = Query.continuous(); * ContinuousQuery qry = Query.continuous();
* *
* // Initial iteration query will return all persons with salary above 1000. * // Initial iteration query will return all persons with salary above 1000.
* qry.setInitialPredicate(Query.scan(new IgniteBiPredicate&lt;UUID, Person&gt;() { * qry.setInitialQuery(Query.scan(new IgniteBiPredicate&lt;UUID, Person&gt;() {
* &#64;Override public boolean apply(UUID id, Person p) { * &#64;Override public boolean apply(UUID id, Person p) {
* return p.getSalary() &gt; 1000; * return p.getSalary() &gt; 1000;
* } * }
Expand Down Expand Up @@ -124,8 +124,8 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> {
*/ */
public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;


/** Initial filter. */ /** Initial query. */
private Query initFilter; private Query initQry;


/** Local listener. */ /** Local listener. */
private CacheEntryUpdatedListener<K, V> locLsnr; private CacheEntryUpdatedListener<K, V> locLsnr;
Expand All @@ -149,11 +149,11 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> {
* which allows to iterate through entries which already existed at the * which allows to iterate through entries which already existed at the
* time continuous query is executed. * time continuous query is executed.
* *
* @param initFilter Initial query. * @param initQuery Initial query.
* @return {@code this} for chaining. * @return {@code this} for chaining.
*/ */
public ContinuousQuery<K, V> setInitialPredicate(Query initFilter) { public ContinuousQuery<K, V> setInitialQuery(Query initQuery) {
this.initFilter = initFilter; this.initQry = initQuery;


return this; return this;
} }
Expand All @@ -163,8 +163,8 @@ public ContinuousQuery<K, V> setInitialPredicate(Query initFilter) {
* *
* @return Initial query. * @return Initial query.
*/ */
public Query getInitialPredicate() { public Query getInitialQuery() {
return initFilter; return initQry;
} }


/** /**
Expand Down
Expand Up @@ -332,7 +332,7 @@ else if (filter instanceof SqlQuery) {
* @return Initial iteration cursor. * @return Initial iteration cursor.
*/ */
private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) {
if (qry.getInitialPredicate() instanceof ContinuousQuery) if (qry.getInitialQuery() instanceof ContinuousQuery)
throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
"continuous query. Use SCAN or SQL query for initial iteration."); "continuous query. Use SCAN or SQL query for initial iteration.");


Expand All @@ -350,8 +350,8 @@ private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boole


final QueryCursor<Cache.Entry<K, V>> cur; final QueryCursor<Cache.Entry<K, V>> cur;


if (qry.getInitialPredicate() != null) if (qry.getInitialQuery() != null)
cur = loc ? localQuery(qry.getInitialPredicate()) : query(qry.getInitialPredicate()); cur = loc ? localQuery(qry.getInitialQuery()) : query(qry.getInitialQuery());
else else
cur = null; cur = null;


Expand Down
Expand Up @@ -619,7 +619,7 @@ public void testInitialPredicate() throws Exception {


ContinuousQuery<Integer, Integer> qry = Query.continuous(); ContinuousQuery<Integer, Integer> qry = Query.continuous();


qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() { qry.setInitialQuery(Query.scan(new P2<Integer, Integer>() {
@Override public boolean apply(Integer k, Integer v) { @Override public boolean apply(Integer k, Integer v) {
return k >= 5; return k >= 5;
} }
Expand Down Expand Up @@ -664,7 +664,7 @@ public void testInitialPredicateAndUpdates() throws Exception {


ContinuousQuery<Integer, Integer> qry = Query.continuous(); ContinuousQuery<Integer, Integer> qry = Query.continuous();


qry.setInitialPredicate(Query.scan(new P2<Integer, Integer>() { qry.setInitialQuery(Query.scan(new P2<Integer, Integer>() {
@Override public boolean apply(Integer k, Integer v) { @Override public boolean apply(Integer k, Integer v) {
return k >= 5; return k >= 5;
} }
Expand Down

0 comments on commit 3d4ce6f

Please sign in to comment.