Permalink
Browse files

A set style queue

  • Loading branch information...
1 parent 321d393 commit 8da4ed800dd7dc80124681c9e29a958e235db041 @imlucas committed Mar 13, 2013
Showing with 99 additions and 2 deletions.
  1. +11 −2 index.js
  2. +88 −0 lib/queue.js
View
@@ -530,7 +530,11 @@ Model.prototype.put = function(alias, obj, expected, returnOldValues){
// Make the request
this.getDB().putItem(request).then(function(data){
log.silly('PUT returned: ' + util.inspect(data, false, 5));
- self.emit('insert', [alias, obj, expected, returnOldValues]);
+ self.emit('insert', {
+ 'alias': alias,
+ 'expected': expected,
+ 'data': obj
+ });
return d.resolve(obj);
}, function(err){
log.error('PUT: ' + err.message + (err.stack ? '\n' + err.stack : ''));
@@ -617,7 +621,12 @@ Model.prototype.updateItem = function(alias, hash, attrs, opts){
// Make the request
this.getDB().updateItem(request).then(function(data){
log.silly('UPDATE_ITEM returned: ' + util.inspect(data, false, 5));
- self.emit('update', [alias, hash, attrs, opts]);
+ self.emit('update', {
+ 'alias': alias,
+ 'range': opts.range,
+ 'updates': attrs,
+ 'options': opts
+ });
if (opts.returnValues !== undefined) {
return d.resolve(schema.import(data.Attributes));
}
View
@@ -0,0 +1,88 @@
+"use strict";
+var when = require('when');
+
+// var queue = new Queue('cloudsearch-user-jobs', 'id');
+// queue.put({'id': 'jm', 'op': 'DELETE', 'ts': Date.now()});
+// queue.put({'id': 'lucas', 'op': 'ADD', 'ts': Date.now()});
+// setInterval(function(){
+// queue.get(1).then(function(res){
+// if(res.data[0].id === 'jm'){
+// return res.retry();
+// }
+// // Throw updates to cloud search
+// model.get('user', res.data.id).then(function(user){
+// aws.cloudSearch[res.data.ops](res.data.id, ts, user).commit();
+// });
+// });
+// }, 1000);
+function Queue(id, itemKey){
+ this.id = id;
+ this.itemKey = itemKey;
+}
+
+Queue.prototype.put = function(item){
+ var self = this,
+ itemId = self.id + '-' + item[self.itemKey],
+ d = when.defer();
+
+ self.model.insert('queue')
+ .set({
+ 'id': itemId,
+ 'data': item
+ })
+ .commit()
+ .then(function(res){
+ d.resolve();
+ }, function(err){
+ d.resolve();
+ });
+ return d.promise;
+};
+
+Queue.prototype.get = function(n){
+ var d = when.defer(),
+ self = this;
+
+ n = n || 1;
+
+ this.model.scan('queue')
+ .where('id', 'BEGINS_WITH', self.id + '-')
+ .limit(n)
+ .fetch()
+ .then(function(res){
+ // Delete them.
+ var ids = res.items.map(function(item){
+ return item.id;
+ }),
+ b = self.model.batch();
+
+ ids.map(function(id){
+ b.remove('queue', id);
+ });
+ b.commit().then(function(){
+ d.resolve(new QueueResult(self, res.items));
+ });
+ });
+ return d.promise;
+};
+
+function QueueResult(queue, items){
+ this.queue = queue;
+ this.items = items;
+ this.data = items.map(function(item){
+ return item.data;
+ });
+}
+
+QueueResult.prototype.retry = function(){
+ // Put items back in the table.
+ var self = this,
+ d = when.defer();
+ when.all(this.items.map(function(item){
+ return self.queue.put(item.data);
+ }), function(res){
+ d.resolve(res);
+ });
+ return d.promise;
+};
+module.exports = Queue;

0 comments on commit 8da4ed8

Please sign in to comment.