Skip to content

Commit

Permalink
Continuing to build out JavaScript Karait client. Found bug with ruby…
Browse files Browse the repository at this point in the history
… and python version in the process.
  • Loading branch information
bcoe committed Sep 6, 2011
1 parent cd7f03e commit 8d9cda1
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 35 deletions.
15 changes: 12 additions & 3 deletions node/lib/helpers.js
@@ -1,11 +1,20 @@
exports.extend = function(o1, o2, o3) {
exports.extend = function() {
var punch = function(o1, o2) {
for (var key in o2) {
if (o2.hasOwnProperty(key)) {
o1[key] = o2[key];
}
}
};
punch(o1, o2);
punch(o1, o3);

if (arguments.length == 3) {
punch(arguments[1], arguments[2]);
punch(arguments[0], arguments[1]);
return;
}

if (arguments.length == 2) {
punch(arguments[0], arguments[1]);
return;
}
}
22 changes: 21 additions & 1 deletion node/lib/message.js
Expand Up @@ -9,6 +9,22 @@ exports.Message = function(source, queueCollection) {
this[key] = this._source[key];
}
}
this._checkIfExpired();
};

exports.Message.prototype._checkIfExpired = function() {
var meta = this._source._meta || {},
expire = meta.expire || -1.0,
currentTime = (new Date()).getTime() / 1000.0,
timestamp = meta.timestamp || 0.0;

if (expire == -1.0) {
return;
}

if ( (currentTime - timestamp) > expire ) {
self._expired = true;
}
};

exports.Message.prototype.BLACKLIST = {
Expand Down Expand Up @@ -42,4 +58,8 @@ exports.Message.prototype.delete = function(callback) {
},
callback
);
}
};

exports.Message.prototype.isExpired = function() {
return this._expired;
};
61 changes: 42 additions & 19 deletions node/lib/queue.js
Expand Up @@ -16,8 +16,7 @@ exports.Queue = function(params) {
queue: 'messages',
averageMessageSize: 8192,
queueSize: 4096,
errorHandler: function(err) {},
collectionCreatedHook: function() {}
onQueueReady: function(err, queue) {}
};
extend(this, defaults, params);
this._initializeQueue(true);
Expand All @@ -37,13 +36,15 @@ exports.Queue.prototype._initializeQueue = function(nativeParser) {
this._initializeQueue(false);
return;
} else {
this.errorHandler(err);
this.onQueueReady(err, null);
return;
}
}

db.open(function(err, db) {
if (err) {
_this.errorHandler(err);
_this.onQueueReady(err, null);
return;
} else {
_this.db = db;
_this._createCappedCollection(db);
Expand All @@ -62,11 +63,12 @@ exports.Queue.prototype._createCappedCollection = function(db) {
},
function(err, collection) {
if (err) {
_this.errorHandler(err);
_this.onQueueReady(err, null);
return;
}
_this.queueCollection = collection;
_this._createIndexes();
_this.collectionCreatedHook();
_this.onQueueReady(null, _this);
}
);
};
Expand All @@ -78,7 +80,15 @@ exports.Queue.prototype._createIndexes = function() {
this.queueCollection.createIndex('_meta.visible_after', function(){});
}

exports.Queue.prototype.write = function(message, options, callback) {
exports.Queue.prototype.write = function(message, params, callback) {

if (typeof(params) === 'function') {
callback = params;
params = {};
}

callback = callback || function() {};

if (message.toObject) {
var messageObject = message.toObject();
} else {
Expand All @@ -91,15 +101,17 @@ exports.Queue.prototype.write = function(message, options, callback) {
expired: false,
visible_after: -1.0
}

if (params.routingKey) {
messageObject._meta.routing_key = params.routingKey;
}

this.queueCollection.insert(messageObject, {safe: true}, callback);
};

exports.Queue.prototype.read = function(params, callback) {
var _this = this;

if (typeof(params) == 'undefined') {
params = {};
}
var _this = this,
params = params || {};

if (typeof(params) === 'function') {
callback = params;
Expand All @@ -109,7 +121,7 @@ exports.Queue.prototype.read = function(params, callback) {
callback = callback || function() {};

extend(
params,
params,
{
messagesRead: 10,
visibilityTimeout: -1.0,
Expand All @@ -119,14 +131,13 @@ exports.Queue.prototype.read = function(params, callback) {
);

var currentTime = (new Date()).getTime() / 1000.0,
messages = [],
query = {
'_meta.expired': false,
'_meta.visible_after': {
'$lt': currentTime
}
},
update = false
update = false;

if (params.routingKey) {
query['_meta.routing_key'] = params.routingKey
Expand All @@ -144,18 +155,30 @@ exports.Queue.prototype.read = function(params, callback) {
}
}

this.queueCollection.find(query, {limit: params.messagesRead}, function(err, cursor) {
if (!update) {
this._normalFind(query, params.messagesRead, callback);
}
};

exports.Queue.prototype._normalFind = function(query, limit, callback) {

var messages = [],
_this = this;

this.queueCollection.find(query, {limit: limit}, function(err, cursor) {
if (err) {
_this.errorHandler(err);
callback(err, null);
return;
} else {
cursor.toArray(function(err, items) {
if (err) {
_this.errorHandler(err);
callback(err, null);
return;
} else {
for (var i = 0, item; (item = items[i]) != null; i++) {
messages.push(new Message(item, _this.queueCollection));
}
callback(messages);
callback(null, messages);
}
});
}
Expand Down
30 changes: 26 additions & 4 deletions node/test/test_message.js
Expand Up @@ -66,12 +66,12 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function() {
queue.write({'foo': 'bar'}, {}, function() {
queue.read(function(messages) {
queue.read(function(err, messages) {
equal(1, messages.length, prefix + 'queue does not have one message');
messages[0].delete(function() {
queue.read(function(messages) {
queue.read(function(err, messages) {
equal(0, messages.length, prefix + 'queue is not empty');
finished();
});
Expand All @@ -80,5 +80,27 @@ exports.tests = {
});
}
});
}
}/*,
'should set a message to expired if current time > than expires time': function(finished, prefix) {
var queue = new Queue({
database: 'karait_test',
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
onQueueReady: function() {
queue.write({'foo': 'bar'}, {expires: 0.01}, function() {
queue.read(function(err, messages) {
equal(1, messages.length, prefix + 'queue does not have one message');
messages[0].delete(function() {
queue.read(function(err, messages) {
equal(0, messages.length, prefix + 'queue is not empty');
finished();
});
});
});
});
}
});
}*/
};
60 changes: 52 additions & 8 deletions node/test/test_queue.js
Expand Up @@ -14,7 +14,7 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function(err) {
queue.queueCollection.options(function(err, options) {
equal(true, options.capped, prefix + 'collection not capped');
equal(4096, options.max, prefix + 'invalid max queue size');
Expand Down Expand Up @@ -52,7 +52,7 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function() {
queue.queueCollection.options(function(err, options) {
collection.count({}, function(err, count) {
equal(1, count, prefix + 'count should be 1.');
Expand All @@ -72,12 +72,11 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function() {
queue.write(
{
foo: 'bar'
},
{},
function() {
var db = new Db('karait_test', new Server('localhost', 27017, {}), {native_parser:false});
db.open(function(err, db) {
Expand All @@ -101,7 +100,7 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function() {
var message = new Message(
{
'bar': 'foo'
Expand All @@ -110,7 +109,6 @@ exports.tests = {

queue.write(
message,
{},
function() {
var db = new Db('karait_test', new Server('localhost', 27017, {}), {native_parser:false});
db.open(function(err, db) {
Expand Down Expand Up @@ -139,7 +137,7 @@ exports.tests = {
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
collectionCreatedHook: function() {
onQueueReady: function() {
writeMessage = new Message({
foo: 1,
bar: 2,
Expand All @@ -148,7 +146,7 @@ exports.tests = {
}
});
queue.write(writeMessage, {}, function() {
readMessage = queue.read(function(messages) {
queue.read(function(err, messages) {
var readMessage = messages[0];
equal(1, readMessage.foo, prefix + 'foo not set');
equal(3, readMessage.innerObject.apple, prefix + 'inner object not found');
Expand All @@ -157,5 +155,51 @@ exports.tests = {
});
}
});
},

'should only return messages that match routing key': function(finished, prefix) {
var queue = new Queue({
database: 'karait_test',
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
onQueueReady: function() {
queue.write({foo: 'bar'}, {routingKey: 'foobar'}, function() {
queue.write({bar: 'foo'}, function() {
queue.read({routingKey: 'foobar'}, function(err, messages) {
equal(1, messages.length, prefix + messages.length + ' not equal to 1');
equal('bar', messages[0].foo, prefix + messages[0].foo + ' not equal to bar');
queue.read(function(err, messages) {
equal(1, messages.length, prefix + messages.length + ' not equal to 1');
equal('foo', messages[0].bar, prefix + messages[0].bar + ' not equal to foo');
finished();
});
});
});
});
}
});
},

'should no longer return a message when delete is called on it': function(finished, prefix) {
var queue = new Queue({
database: 'karait_test',
queue: 'queue_test',
averageMessageSize: 8192,
queueSize: 4096,
onQueueReady: function() {
queue.write({foo: 'bar'}, function() {
queue.read(function(err, messages) {
equal(1, messages.length, prefix + messages.length + ' not equal to 1');
messages[0].delete(function() {
queue.read(function(err, messages) {
equal(0, messages.length, prefix + messages.length + ' not equal to 0');
finished();
});
});
});
});
}
});
}
};
1 change: 1 addition & 0 deletions python/karait/model/message.py
Expand Up @@ -31,6 +31,7 @@ def _check_if_expired(self):

if ( time.time() - meta.get('timestamp', 0.0) ) > expire:
self._expired = True
self.delete()

def to_dictionary(self):
dictionary = {}
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/message.rb
Expand Up @@ -60,6 +60,7 @@ def set_expired

if current_time - meta.fetch('timestamp', 0.0) > meta.fetch('expire', -1.0)
@expired = true
delete
end
end

Expand Down

0 comments on commit 8d9cda1

Please sign in to comment.