Skip to content

Commit

Permalink
Added an iterator for SliceCounter to enable iterating over all colum…
Browse files Browse the repository at this point in the history
…ns in the row.
  • Loading branch information
Shane Perry committed Aug 24, 2012
1 parent 8dacda0 commit 441392a
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package me.prettyprint.cassandra.service;

import java.util.Iterator;
import java.util.List;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.query.SliceCounterQuery;

/**
* Iterates over the SliceCounterQuery, refreshing the query until all qualifing
* columns are retrieved. If column deletion can occur synchronously with
* calls to {@link #hasNext hasNext()}, the column name object type must
* override Object.equals().
*
* @author thrykol
*/
public class SliceCounterIterator<K, N> implements Iterator<HCounterColumn<N>> {

private static final int DEFAULT_COUNT = 100;
private SliceCounterQuery<K, N> query;
private Iterator<HCounterColumn<N>> iterator;
private N start;
private SliceCounterFinish<N> finish;
private boolean reversed;
private int count = DEFAULT_COUNT;
private int columns = 0;

/**
* Constructor
*
* @param query Base SliceQuery to execute
* @param start Starting point of the range
* @param finish Finish point of the range.
* @param reversed Whether or not the columns should be reversed
*/
public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, final N finish, boolean reversed) {
this(query, start, finish, reversed, DEFAULT_COUNT);
}

/**
* Constructor
*
* @param query Base SliceQuery to execute
* @param start Starting point of the range
* @param finish Finish point of the range.
* @param reversed Whether or not the columns should be reversed
* @param count the amount of columns to retrieve per batch
*/
public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, final N finish, boolean reversed, int count) {
this(query, start, new SliceCounterFinish<N>() {

@Override
public N function() {
return finish;
}
}, reversed, count);
}

/**
* Constructor
*
* @param query Base SliceQuery to execute
* @param start Starting point of the range
* @param finish Finish point of the range. Allows for a dynamically
* determined point
* @param reversed Whether or not the columns should be reversed
*/
public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, SliceCounterFinish<N> finish, boolean reversed) {
this(query, start, finish, reversed, DEFAULT_COUNT);
}

/**
* Constructor
*
* @param query Base SliceQuery to execute
* @param start Starting point of the range
* @param finish Finish point of the range. Allows for a dynamically
* determined point
* @param reversed Whether or not the columns should be reversed
* @param count the amount of columns to retrieve per batch
*/
public SliceCounterIterator(SliceCounterQuery<K, N> query, N start, SliceCounterFinish<N> finish, boolean reversed, int count) {
this.query = query;
this.start = start;
this.finish = finish;
this.reversed = reversed;
this.count = count;
this.query.setRange(this.start, this.finish.function(), this.reversed, this.count);
}

@Override
public boolean hasNext() {
if (iterator == null) {
iterator = query.execute().get().getColumns().iterator();
} else if (!iterator.hasNext() && columns == count) { // only need to do another query if maximum columns were retrieved
query.setRange(start, finish.function(), reversed, count);
columns = 0;
List<HCounterColumn<N>> list = query.execute().get().getColumns();
iterator = list.iterator();

if (iterator.hasNext()) {
// The lower bound column may have been removed prior to the query executing,
// so check to see if the first column returned by the current query is the same
// as the lower bound column. If both columns are the same, skip the column
N first = list.get(0).getName();
if (first.equals(start)) {
next();
}
}
}

return iterator.hasNext();
}

@Override
public HCounterColumn<N> next() {
HCounterColumn<N> column = iterator.next();
start = column.getName();
columns++;

return column;
}

@Override
public void remove() {
iterator.remove();
}

/**
* When iterating over a SliceCounter, it may be desirable to move the finish
* point for each query. This interface allows for a user defined function
* which will return the new finish point. This is especially useful for
* column families which have a TimeUUID as the column name.
*/
public interface SliceCounterFinish<N> {

/**
* Generic function for deriving a new finish point.
*
* @return New finish point
*/
N function();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package me.prettyprint.cassandra.service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.serializers.UUIDSerializer;
import me.prettyprint.cassandra.utils.TimeUUIDUtils;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HCounterColumn;
import me.prettyprint.hector.api.factory.HFactory;
import static me.prettyprint.hector.api.factory.HFactory.*;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.SliceCounterQuery;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;

public class SliceCounterIteratorTest extends BaseEmbededServerSetupTest {

private static final UUIDSerializer us = UUIDSerializer.get();
private static final StringSerializer se = new StringSerializer();
private static final String CF = "Counter1";
private static final String KEY = "key";
private static final SliceCounterIterator.SliceCounterFinish<UUID> FINISH = new SliceCounterIterator.SliceCounterFinish<UUID>() {

@Override
public UUID function() {
return TimeUUIDUtils.getUniqueTimeUUIDinMillis();
}
};
private Cluster cluster;
private Keyspace keyspace;

@Before
public void setUp() {
cluster = getOrCreateCluster("Test Cluster", "127.0.0.1:9170");
keyspace = createKeyspace("Keyspace1", cluster);

Mutator<String> m = createMutator(keyspace, se);
for (int i = 0; i < 1000; i++) {
m.addCounter(KEY, CF, createCounterColumn(TimeUUIDUtils.getUniqueTimeUUIDinMillis(), 1, us));
}
m.execute();
}

@After
public void tearDown() {
Mutator<String> m = createMutator(keyspace, se);
m.addDeletion(KEY, CF);
m.execute();
}

@Test
public void testIterator() {
SliceCounterQuery<String, UUID> query = HFactory.createCounterSliceQuery(keyspace, se, us).setKey(KEY).setColumnFamily(CF);
SliceCounterIterator<String, UUID> it = new SliceCounterIterator<String, UUID>(query, null, FINISH, false, 100);

Map<UUID, Long> results = new HashMap<UUID, Long>();
while (it.hasNext()) {
HCounterColumn<UUID> c = it.next();
results.put(c.getName(), c.getValue());
}
assertEquals(1000, results.size());
}

@Test
public void testModificationIterator() {
Mutator mutator = HFactory.createMutator(keyspace, se);
SliceCounterQuery<String, UUID> query = HFactory.createCounterSliceQuery(keyspace, se, us).setKey(KEY).setColumnFamily(CF);
SliceCounterIterator<String, UUID> it = new SliceCounterIterator<String, UUID>(query, null, FINISH, false, 100);

Map<UUID, Long> results = new HashMap<UUID, Long>();
while (it.hasNext()) {
HCounterColumn<UUID> c = it.next();
results.put(c.getName(), c.getValue());
mutator.addDeletion(KEY, CF, c.getName(), us);
mutator.execute();
}
assertEquals(1000, results.size());
}
}

0 comments on commit 441392a

Please sign in to comment.