Permalink
Browse files

Added an iterator for SliceCounter to enable iterating over all colum…

…ns in the row.
  • Loading branch information...
1 parent 8dacda0 commit 441392adba16131ba24b1a7c87f839aa1d368d6f Shane Perry committed Aug 24, 2012
@@ -0,0 +1,143 @@
+package me.prettyprint.cassandra.service;
+
+import java.util.Iterator;
+import java.util.List;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.query.SliceCounterQuery;
+
+/**
+ * Iterates over the SliceCounterQuery, refreshing the query until all qualifing
+ * columns are retrieved. If column deletion can occur synchronously with
+ * calls to {@link #hasNext hasNext()}, the column name object type must
+ * override Object.equals().
+ *
+ * @author thrykol
+ */
+public class SliceCounterIterator<K, N> implements Iterator<HCounterColumn<N>> {
+
+ private static final int DEFAULT_COUNT = 100;
+ private SliceCounterQuery<K, N> query;
+ private Iterator<HCounterColumn<N>> iterator;
+ private N start;
+ private SliceCounterFinish<N> finish;
+ private boolean reversed;
+ private int count = DEFAULT_COUNT;
+ private int columns = 0;
+
+ /**
+ * Constructor
+ *
+ * @param query Base SliceQuery to execute
+ * @param start Starting point of the range
+ * @param finish Finish point of the range.
+ * @param reversed Whether or not the columns should be reversed
+ */
+ public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, final N finish, boolean reversed) {
+ this(query, start, finish, reversed, DEFAULT_COUNT);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param query Base SliceQuery to execute
+ * @param start Starting point of the range
+ * @param finish Finish point of the range.
+ * @param reversed Whether or not the columns should be reversed
+ * @param count the amount of columns to retrieve per batch
+ */
+ public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, final N finish, boolean reversed, int count) {
+ this(query, start, new SliceCounterFinish<N>() {
+
+ @Override
+ public N function() {
+ return finish;
+ }
+ }, reversed, count);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param query Base SliceQuery to execute
+ * @param start Starting point of the range
+ * @param finish Finish point of the range. Allows for a dynamically
+ * determined point
+ * @param reversed Whether or not the columns should be reversed
+ */
+ public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, SliceCounterFinish<N> finish, boolean reversed) {
+ this(query, start, finish, reversed, DEFAULT_COUNT);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param query Base SliceQuery to execute
+ * @param start Starting point of the range
+ * @param finish Finish point of the range. Allows for a dynamically
+ * determined point
+ * @param reversed Whether or not the columns should be reversed
+ * @param count the amount of columns to retrieve per batch
+ */
+ public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, SliceCounterFinish<N> finish, boolean reversed, int count) {
+ this.query = query;
+ this.start = start;
+ this.finish = finish;
+ this.reversed = reversed;
+ this.count = count;
+ this.query.setRange(this.start, this.finish.function(), this.reversed, this.count);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (iterator == null) {
+ iterator = query.execute().get().getColumns().iterator();
+ } else if (!iterator.hasNext() && columns == count) { // only need to do another query if maximum columns were retrieved
+ query.setRange(start, finish.function(), reversed, count);
+ columns = 0;
+ List<HCounterColumn<N>> list = query.execute().get().getColumns();
+ iterator = list.iterator();
+
+ if (iterator.hasNext()) {
+ // The lower bound column may have been removed prior to the query executing,
+ // so check to see if the first column returned by the current query is the same
+ // as the lower bound column. If both columns are the same, skip the column
+ N first = list.get(0).getName();
+ if (first.equals(start)) {
+ next();
+ }
+ }
+ }
+
+ return iterator.hasNext();
+ }
+
+ @Override
+ public HCounterColumn<N> next() {
+ HCounterColumn<N> column = iterator.next();
+ start = column.getName();
+ columns++;
+
+ return column;
+ }
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+
+ /**
+ * When iterating over a SliceCounter, it may be desirable to move the finish
+ * point for each query. This interface allows for a user defined function
+ * which will return the new finish point. This is especially useful for
+ * column families which have a TimeUUID as the column name.
+ */
+ public interface SliceCounterFinish<N> {
+
+ /**
+ * Generic function for deriving a new finish point.
+ *
+ * @return New finish point
+ */
+ N function();
+ }
+}
@@ -0,0 +1,85 @@
+package me.prettyprint.cassandra.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.serializers.UUIDSerializer;
+import me.prettyprint.cassandra.utils.TimeUUIDUtils;
+import me.prettyprint.hector.api.Cluster;
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import static me.prettyprint.hector.api.factory.HFactory.*;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.SliceCounterQuery;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SliceCounterIteratorTest extends BaseEmbededServerSetupTest {
+
+ private static final UUIDSerializer us = UUIDSerializer.get();
+ private static final StringSerializer se = new StringSerializer();
+ private static final String CF = "Counter1";
+ private static final String KEY = "key";
+ private static final SliceCounterIterator.SliceCounterFinish<UUID> FINISH = new SliceCounterIterator.SliceCounterFinish<UUID>() {
+
+ @Override
+ public UUID function() {
+ return TimeUUIDUtils.getUniqueTimeUUIDinMillis();
+ }
+ };
+ private Cluster cluster;
+ private Keyspace keyspace;
+
+ @Before
+ public void setUp() {
+ cluster = getOrCreateCluster("Test Cluster", "127.0.0.1:9170");
+ keyspace = createKeyspace("Keyspace1", cluster);
+
+ Mutator<String> m = createMutator(keyspace, se);
+ for (int i = 0; i < 1000; i++) {
+ m.addCounter(KEY, CF, createCounterColumn(TimeUUIDUtils.getUniqueTimeUUIDinMillis(), 1, us));
+ }
+ m.execute();
+ }
+
+ @After
+ public void tearDown() {
+ Mutator<String> m = createMutator(keyspace, se);
+ m.addDeletion(KEY, CF);
+ m.execute();
+ }
+
+ @Test
+ public void testIterator() {
+ SliceCounterQuery<String, UUID> query = HFactory.createCounterSliceQuery(keyspace, se, us).setKey(KEY).setColumnFamily(CF);
+ SliceCounterIterator<String, UUID> it = new SliceCounterIterator<String, UUID>(query, null, FINISH, false, 100);
+
+ Map<UUID, Long> results = new HashMap<UUID, Long>();
+ while (it.hasNext()) {
+ HCounterColumn<UUID> c = it.next();
+ results.put(c.getName(), c.getValue());
+ }
+ assertEquals(1000, results.size());
+ }
+
+ @Test
+ public void testModificationIterator() {
+ Mutator mutator = HFactory.createMutator(keyspace, se);
+ SliceCounterQuery<String, UUID> query = HFactory.createCounterSliceQuery(keyspace, se, us).setKey(KEY).setColumnFamily(CF);
+ SliceCounterIterator<String, UUID> it = new SliceCounterIterator<String, UUID>(query, null, FINISH, false, 100);
+
+ Map<UUID, Long> results = new HashMap<UUID, Long>();
+ while (it.hasNext()) {
+ HCounterColumn<UUID> c = it.next();
+ results.put(c.getName(), c.getValue());
+ mutator.addDeletion(KEY, CF, c.getName(), us);
+ mutator.execute();
+ }
+ assertEquals(1000, results.size());
+ }
+}

0 comments on commit 441392a

Please sign in to comment.