Permalink
Browse files

Adding getRange operation, adds support for #8

  • Loading branch information...
1 parent 154790c commit 56bc368ee6cc5662f13821a360df2f63d2efbc8b @calvinfo committed Oct 12, 2012
Showing with 84 additions and 0 deletions.
  1. +84 −0 lib/column_family.js
View
84 lib/column_family.js
@@ -1,4 +1,5 @@
var util = require('util'),
+ Stream = require('stream').Stream,
Marshal = require('./marshal'),
Column = require('./column'),
CounterColumn = require('./counter_column'),
@@ -436,4 +437,87 @@ ColumnFamily.prototype.incr = function (key, column, value, options, callback) {
this.connection.execute('batch_mutate', batch, consistency, callback);
};
+
+/**
+ * Performces a key range slice on the CF.
+ *
+ * Start and end keys can be specified in the options. If no keys are specified
+ * then the entire column range space will be iterated over.
+ *
+ * See: http://wiki.apache.org/cassandra/API#KeyRange
+ *
+ * @param {Object} options
+ * @param {String} start - the starting key
+ * @param {String} end - the ending key
+ * @return {Stream}
+ */
+ColumnFamily.prototype.getRange = function (options) {
+
+ options = options || {};
+
+ var self = this,
+ consistency = options.consistency || options.consistencyLevel || DEFAULT_READ_CONSISTENCY,
+ predicate = getSlicePredicate(options, this.columnMarshaller),
+ iterations = 0,
+ stream = new Stream(),
+ keyRange, keyRangeOptions = {};
+
+ if (options.start && options.end) {
+ keyRangeOptions.start_key = options.start;
+ keyRangeOptions.end_key = options.end;
+ } else {
+ // Fetch all the keys
+ keyRangeOptions.start_token = '0';
+ keyRangeOptions.end_token = '0';
+ }
+
+ keyRange = new ttype.KeyRange(keyRangeOptions);
+
+ function iterator () {
+ self.connection.execute('get_range_slices', columnParent(self), predicate,
+ keyRange, consistency, onComplete);
+ }
+
+ function onComplete(err, val){
+ if(err){
+ stream.emit('error', err);
+ return;
+ }
+
+ var i = 0, len = val.length, row, count = 0;
+ for(; i < len; i += 1){
+ // First key is a duplicate from the last iteration.
+ if (iterations > 0 && i === 0)
+ continue;
+
+ row = val[i];
+ stream.emit('data', Row.fromThrift(self.keyMarshaller.deserialize(row.key), row.columns, self));
+ // Track how many items we have emitted
+ count += 1;
+ }
+
+ iterations += 1;
+
+ if (count > 0){
+ var startKey = val[len - 1].key;
+
+ delete keyRangeOptions.start_token;
+ keyRangeOptions.start_key = startKey;
+
+ keyRange = new ttype.KeyRange(keyRangeOptions);
+
+ process.nextTick(iterator);
+
+ } else {
+ stream.emit('end');
+ }
+ }
+
+ // Start our stream next tick
+ process.nextTick(iterator);
+
+ return stream;
+};
+
+
module.exports = ColumnFamily;

0 comments on commit 56bc368

Please sign in to comment.