Permalink
Browse files

Merge pull request #578 from hector-client/SliceFilter

Add functionality to filter query results
  • Loading branch information...
2 parents 570798c + e184037 commit 7d92c51eed7393bd18960abd1cd0d74429ac612a @zznate zznate committed Jan 11, 2013
@@ -1,7 +1,10 @@
package me.prettyprint.cassandra.service;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import java.util.Iterator;
import java.util.List;
+import me.prettyprint.cassandra.service.template.SliceFilter;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.query.SliceQuery;
@@ -16,9 +19,10 @@
private static final int DEFAULT_COUNT = 100;
private SliceQuery<K, N, V> query;
- private Iterator<HColumn<N, V>> iterator;
+ private PeekingIterator<HColumn<N, V>> iterator;
private N start;
private ColumnSliceFinish<N> finish;
+ private SliceFilter<HColumn<N, V>> filter = null;
private boolean reversed;
private int count = DEFAULT_COUNT;
private int columns = 0;
@@ -86,24 +90,30 @@ public ColumnSliceIterator(SliceQuery<K, N, V> query, N start, ColumnSliceFinish
this.query.setRange(this.start, this.finish.function(), this.reversed, this.count);
}
+ /**
+ * Set a filter to determine which columns will be returned.
+ *
+ * @param filter Filter to determine which columns will be returned
+ * @return &lt;this&gt;
+ */
+ public ColumnSliceIterator setFilter(SliceFilter<HColumn<N, V>> filter) {
+ this.filter = filter;
+ return this;
+ }
+
@Override
public boolean hasNext() {
if (iterator == null) {
- iterator = query.execute().get().getColumns().iterator();
+ iterator = Iterators.peekingIterator(query.execute().get().getColumns().iterator());
} else if (!iterator.hasNext() && columns == count) { // only need to do another query if maximum columns were retrieved
- query.setRange(start, finish.function(), reversed, count);
- columns = 0;
- List<HColumn<N, V>> list = query.execute().get().getColumns();
- iterator = list.iterator();
+ refresh();
+ }
- if (iterator.hasNext()) {
- // The lower bound column may have been removed prior to the query executing,
- // so check to see if the first column returned by the current query is the same
- // as the lower bound column. If both columns are the same, skip the column
- N first = list.get(0).getName();
- if (first.equals(start)) {
- next();
- }
+ while(filter != null && iterator != null && iterator.hasNext() && !filter.accept(iterator.peek())) {
+ next();
+
+ if(!iterator.hasNext() && columns == count) {
+ refresh();
}
}
@@ -124,6 +134,23 @@ public void remove() {
iterator.remove();
}
+ private void refresh() {
+ query.setRange(start, finish.function(), reversed, count);
+ columns = 0;
+ List<HColumn<N, V>> list = query.execute().get().getColumns();
+ iterator = Iterators.peekingIterator(list.iterator());
+
+ if (iterator.hasNext()) {
+ // The lower bound column may have been removed prior to the query executing,
+ // so check to see if the first column returned by the current query is the same
+ // as the lower bound column. If both columns are the same, skip the column
+ N first = list.get(0).getName();
+ if (first.equals(start)) {
+ next();
+ }
+ }
+ }
+
/**
* When iterating over a ColumnSlice, it may be desirable to move the finish
* point for each query. This interface allows for a user defined function
@@ -1,6 +1,9 @@
package me.prettyprint.cassandra.service;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import java.util.Iterator;
+import me.prettyprint.cassandra.service.template.SliceFilter;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.query.RangeSlicesQuery;
@@ -14,7 +17,8 @@
private RangeSlicesQuery<K, N, V> query;
private K startKey;
private K endKey;
- private Iterator<Row<K, N, V>> iterator;
+ private PeekingIterator<Row<K, N, V>> iterator;
+ private SliceFilter<Row<K, N, V>> filter = null;
private int rows = 0;
public RangeSlicesIterator(RangeSlicesQuery<K, N, V> query, K startKey, K endKey) {
@@ -28,15 +32,16 @@ public RangeSlicesIterator(RangeSlicesQuery<K, N, V> query, K startKey, K endKey
public boolean hasNext() {
if (iterator == null) {
// First time through
- iterator = query.execute().get().getList().iterator();
+ iterator = Iterators.peekingIterator(query.execute().get().getList().iterator());
} else if (!iterator.hasNext() && rows == query.getRowCount()) { // only need to do another query if maximum rows were retrieved
- query.setKeys(startKey, endKey);
- iterator = query.execute().get().getList().iterator();
- rows = 0;
- if (iterator.hasNext()) {
- // First element is startKey which was the last element on the previous query result - skip it
- next();
+ }
+
+ while(filter != null && iterator != null && iterator.hasNext() && !filter.accept(iterator.peek())) {
+ next();
+
+ if(!iterator.hasNext() && rows == query.getRowCount()) {
+ refresh();
}
}
@@ -56,4 +61,15 @@ public boolean hasNext() {
public void remove() {
iterator.remove();
}
+
+ private void refresh() {
+ query.setKeys(startKey, endKey);
+ iterator = Iterators.peekingIterator(query.execute().get().getList().iterator());
+ rows = 0;
+
+ if (iterator.hasNext()) {
+ // First element is startKey which was the last element on the previous query result - skip it
+ next();
+ }
+ }
}
@@ -1,7 +1,10 @@
package me.prettyprint.cassandra.service;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
import java.util.Iterator;
import java.util.List;
+import me.prettyprint.cassandra.service.template.SliceFilter;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.query.SliceCounterQuery;
@@ -17,9 +20,10 @@
private static final int DEFAULT_COUNT = 100;
private SliceCounterQuery<K, N> query;
- private Iterator<HCounterColumn<N>> iterator;
+ private PeekingIterator<HCounterColumn<N>> iterator;
private N start;
private SliceCounterFinish<N> finish;
+ private SliceFilter<HCounterColumn<N>> filter = null;
private boolean reversed;
private int count = DEFAULT_COUNT;
private int columns = 0;
@@ -87,24 +91,30 @@ public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, SliceCounter
this.query.setRange(this.start, this.finish.function(), this.reversed, this.count);
}
+ /**
+ * Set a filter to determine which columns will be returned.
+ *
+ * @param filter Filter to determine which columns will be returned
+ * @return &lt;this&gt;
+ */
+ public SliceCounterIterator setFilter(SliceFilter<HCounterColumn<N>> filter) {
+ this.filter = filter;
+ return this;
+ }
+
@Override
public boolean hasNext() {
if (iterator == null) {
- iterator = query.execute().get().getColumns().iterator();
+ iterator = Iterators.peekingIterator(query.execute().get().getColumns().iterator());
} else if (!iterator.hasNext() && columns == count) { // only need to do another query if maximum columns were retrieved
- query.setRange(start, finish.function(), reversed, count);
- columns = 0;
- List<HCounterColumn<N>> list = query.execute().get().getColumns();
- iterator = list.iterator();
-
- if (iterator.hasNext()) {
- // The lower bound column may have been removed prior to the query executing,
- // so check to see if the first column returned by the current query is the same
- // as the lower bound column. If both columns are the same, skip the column
- N first = list.get(0).getName();
- if (first.equals(start)) {
- next();
- }
+ refresh();
+ }
+
+ while(filter != null && iterator != null && iterator.hasNext() && !filter.accept(iterator.peek())) {
+ next();
+
+ if(!iterator.hasNext() && columns == count) {
+ refresh();
}
}
@@ -125,6 +135,22 @@ public void remove() {
iterator.remove();
}
+ private void refresh() {
+ query.setRange(start, finish.function(), reversed, count);
+ columns = 0;
+ List<HCounterColumn<N>> list = query.execute().get().getColumns();
+ iterator = Iterators.peekingIterator(list.iterator());
+
+ if (iterator.hasNext()) {
+ // The lower bound column may have been removed prior to the query executing,
+ // so check to see if the first column returned by the current query is the same
+ // as the lower bound column. If both columns are the same, skip the column
+ N first = list.get(0).getName();
+ if (first.equals(start)) {
+ next();
+ }
+ }
+ }
/**
* When iterating over a SliceCounter, it may be desirable to move the finish
* point for each query. This interface allows for a user defined function
@@ -0,0 +1,15 @@
+package me.prettyprint.cassandra.service.template;
+
+/**
+ *
+ * @author thrykol
+ */
+public interface SliceFilter<C>
+{
+ /**
+ * Determine if the column should be included in the slice
+ * @param column Column
+ * @return True if the column should be included, false otherwise
+ */
+ boolean accept(C column);
+}
@@ -1,11 +1,14 @@
package me.prettyprint.cassandra.service;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.template.SliceFilter;
import me.prettyprint.cassandra.utils.TimeUUIDUtils;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
@@ -82,4 +85,40 @@ public void testModificationIterator() {
}
assertEquals(1000, results.size());
}
+
+ @Test
+ public void testFilter() {
+ cluster.truncate(keyspace.getKeyspaceName(), CF);
+
+ Mutator<String> m = createMutator(keyspace, se);
+ for (int i = 0; i < 500; i++) {
+ m.addInsertion(KEY, CF, createColumn("a" + i, String.valueOf(i), se, se));
+ m.addInsertion(KEY, CF, createColumn("b" + i, String.valueOf(i), se, se));
+ m.addInsertion(KEY, CF, createColumn("c" + i, String.valueOf(i), se, se));
+ }
+ m.execute();
+
+ SliceQuery<String, String, String> query = HFactory.createSliceQuery(keyspace, se, se, se)
+ .setKey(KEY)
+ .setColumnFamily(CF);
+ ColumnSliceIterator<String, String, String> it = new ColumnSliceIterator<String, String, String>(query, "a", "d", false, 2).
+ setFilter(new SliceFilter<HColumn<String, String>>() {
+
+ @Override
+ public boolean accept(HColumn<String, String> column) {
+ return !column.getName().startsWith("b");
+ }
+ });
+
+ List<String> results = new ArrayList<String>(1000);
+ while (it.hasNext()) {
+ HColumn<String, String> c = it.next();
+ String name = c.getName();
+
+ assertFalse(name.equals("b"));
+ results.add(name);
+ }
+ assertEquals(1000, results.size());
+
+ }
}
@@ -1,11 +1,14 @@
package me.prettyprint.cassandra.service;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.service.template.SliceFilter;
import me.prettyprint.cassandra.utils.TimeUUIDUtils;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
@@ -18,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.*;
public class SliceCounterIteratorTest extends BaseEmbededServerSetupTest {
@@ -82,4 +86,39 @@ public void testModificationIterator() {
}
assertEquals(1000, results.size());
}
+
+ @Test
+ public void testFilter() {
+ cluster.truncate(keyspace.getKeyspaceName(), CF);
+
+ Mutator<String> m = createMutator(keyspace, se);
+ for (int i = 0; i < 500; i++) {
+ m.addCounter(KEY, CF, createCounterColumn("a" + i, 1, se));
+ m.addCounter(KEY, CF, createCounterColumn("b" + i, 1, se));
+ m.addCounter(KEY, CF, createCounterColumn("c" + i, 1, se));
+ }
+ m.execute();
+
+ SliceCounterQuery<String, String> query = HFactory.createCounterSliceQuery(keyspace, se, se).setKey(KEY).setColumnFamily(CF);
+ SliceCounterIterator<String, String> it = new SliceCounterIterator<String, String>(query, "a", "d", false, 100).setFilter(new SliceFilter<HCounterColumn<String>>() {
+
+ @Override
+ public boolean accept(HCounterColumn<String> column)
+ {
+ return !column.getName().startsWith("b");
+ }
+
+ });
+
+ List<String> results = new ArrayList<String>(1000);
+ while(it.hasNext()) {
+ HCounterColumn<String> c = it.next();
+ String name = c.getName();
+
+ assertFalse(name.equals("b"));
+ results.add(name);
+ }
+
+ assertEquals(1000, results.size());
+ }
}

0 comments on commit 7d92c51

Please sign in to comment.