Skip to content

Commit

Permalink
implemented getEventRangeMatching function in storages
Browse files Browse the repository at this point in the history
  • Loading branch information
Adriano Raiano committed Mar 14, 2012
1 parent 65ed717 commit 6dc19fc
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 30 deletions.
51 changes: 51 additions & 0 deletions lib/storage/inMemory/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,57 @@ Storage.prototype = {
callback(null, events);
},

// __getEventRangeMatching:__ loads the range of events from given storage.
//
// `storage.getEventRangeMatching(match, amount, callback)`
//
// - __match:__ match query in inner event (payload)
// - __amount:__ amount of events
// - __callback:__ `function(err, events){}`
getEventRangeMatching: function(match, amount, callback) {
var events = [];
for (var e in this.store) {
events = events.concat(this.store[e]);
}

events.sort(function(a, b){
return a.commitStamp - b.commitStamp;
});

var index = 0;

if (match) {
for (var m in match) {
if (match.hasOwnProperty(m)) {

for (var len = events.length; index < len; index++) {
var evt = events[index];

if (evt.payload[m] === match[m]) {
break;
}
}

break;
}
}
}

if (events.length > index + 1) {

var endIndex = 0;
if (events.length > index + 1 + amount) {
endIndex = index + 1 + amount;
} else if (events.length <= index + 1 + amount) {
endIndex = events.length - 1;
}

events = events.slice(index + 1, endIndex);
}

callback(null, events);
},

// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
Expand Down
63 changes: 63 additions & 0 deletions storage/couchDb/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,69 @@ Storage.prototype = {

},

// __getEventRangeMatching:__ loads the range of events from given storage.
//
// `storage.getEventRangeMatching(match, amount, callback)`
//
// - __match:__ match query in inner event (payload)
// - __amount:__ amount of events
// - __callback:__ `function(err, events){}`
getEventRangeMatching: function(match, amount, callback) {

this.client.view(this.options.dbName+'/allEvents', {descending: false},
function(err, res) {
if(!err) {
var result = [];
for (var i in res) {
result.push(res[i].value);
}


result.sort(function(a, b){
return a.commitStamp - b.commitStamp;
});

var index = 0;

if (match) {
for (var m in match) {
if (match.hasOwnProperty(m)) {

for (var len = result.length; index < len; index++) {
var evt = result[index];

if (evt.payload[m] === match[m]) {
break;
}
}

break;
}
}
}

if (result.length > index + 1) {

var endIndex = 0;
if (result.length > index + 1 + amount) {
endIndex = index + 1 + amount;
} else if (result.length <= index + 1 + amount) {
endIndex = result.length - 1;
}

result = result.slice(index + 1, endIndex);
}

callback(null, result);
}
else {
callback(err);
}
}.bind(this)
);

},

// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
Expand Down
33 changes: 24 additions & 9 deletions storage/couchDb/test/couchDbStorageSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ vows.describe('The ' + storageName + ' Storage')
}
},

'after a successful `fill` we get a range of events searching by event id': {
topic: function (storage) {
storage.getEventRangeMatching({id: '2'}, 2, this.callback);
},

'we can assert if length is right': function (events) {
assert.equal(events.length, 2);
},

'we can assert if sorting is right': function (events) {
assert.equal(events[0].commitId, '2');
assert.equal(events[1].commitId, '3');
}
},

'after a successful `fill` we get the latest event': {
topic: function (storage) {
storage.getLastEventOfStream('2', this.callback);
Expand Down Expand Up @@ -199,24 +214,24 @@ function clear(storage, callback) {
function fillStore(storage, callback) {
clear(storage, function(err) {
storage.addEvents([
{streamId: '2', streamRevision: 0, commitId: '0', payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 1, commitId: '1', payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 2, commitId: '2', payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 3, commitId: '3', payload: {event:'blaaaaaaaaaaa'}, dispatched: false}
{streamId: '2', streamRevision: 0, commitId: 0, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 1, commitId: 1, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 2, commitId: 2, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 3, commitId: 3, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}, dispatched: false}
],
function (err) {
storage.addEvents([
{streamId: '3', streamRevision: 0, commitId: '4', payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '3', streamRevision: 1, commitId: '5', payload: {event:'blaaaaaaaaaaa'}, dispatched: false}
{streamId: '3', streamRevision: 0, commitId: 4, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '3', streamRevision: 1, commitId: 5, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}, dispatched: false}
],
function (err) {
storage.addSnapshot({snapshotId: 'snap1', streamId: '3', revision: 1, data: 'data'}, function() {
storage.addSnapshot({snapshotId: 'snap2', streamId: '3', revision: 2, data: 'dataPlus'}, function() {
storage.addSnapshot({snapshotId: '1', streamId: '3', revision: 1, data: 'data'}, function() {
storage.addSnapshot({snapshotId: '2', streamId: '3', revision: 2, data: 'dataPlus'}, function() {
callback(null, storage);
});
});
}
);
});
});
};
}
34 changes: 34 additions & 0 deletions storage/mongoDb/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,40 @@ Storage.prototype = {
this.events.find({}, {sort:[['commitStamp','asc']], skip: index, limit: amount}).toArray(callback);
},

// __getEventRangeMatching:__ loads the range of events from given storage.
//
// `storage.getEventRangeMatching(match, amount, callback)`
//
// - __match:__ match query in inner event (payload)
// - __amount:__ amount of events
// - __callback:__ `function(err, events){}`
getEventRangeMatching: function(match, amount, callback) {
var self = this;
var query = {};

if (match) {
for (var m in match) {
if (match.hasOwnProperty(m)) {

query['payload.' + m] = match[m];

break;
}
}
}

this.events.findOne(query, function(err, evt) {

self.events.find({
'commitStamp': {'$gte': evt.commitStamp},
'$or': [
{ 'streamId': {'$ne': evt.streamId}},
{'commitId': {'$ne': evt.commitId}} ]},
{sort:[['commitStamp','asc']], limit: amount}).toArray(callback);

});
},

// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
Expand Down
29 changes: 22 additions & 7 deletions storage/mongoDb/test/mongoDbStorageSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ vows.describe('The ' + storageName + ' Storage')
'can be filled with events': function(storage) {
var id = "1234-abcd";
storage.addEvents([{'streamId': id, 'streamRevision': 0, 'payload': {event:'bla'}}], function(err) {
storage.getEvents(id, 0, -1, function(err, events) {
storage.getEvents(id, 0, -1, function(err, events) {console.log(err);
assert.equal(events.length, 1);
});
});
Expand Down Expand Up @@ -133,6 +133,21 @@ vows.describe('The ' + storageName + ' Storage')
}
},

'after a successful `fill` we get a range of events searching by event id': {
topic: function (storage) {
storage.getEventRangeMatching({id: '2'}, 2, this.callback);
},

'we can assert if length is right': function (events) {
assert.equal(events.length, 2);
},

'we can assert if sorting is right': function (events) {
assert.equal(events[0].commitId, '2');
assert.equal(events[1].commitId, '3');
}
},

'after a successful `fill` we get the latest event': {
topic: function (storage) {
storage.getLastEventOfStream('2', this.callback);
Expand Down Expand Up @@ -179,15 +194,15 @@ vows.describe('The ' + storageName + ' Storage')
function fillStore(storage, callback) {
storage.events.remove({}, function(err) {
storage.addEvents([
{streamId: '2', streamRevision: 0, commitId: 0, payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 1, commitId: 1, payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 2, commitId: 2, payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 3, commitId: 3, payload: {event:'blaaaaaaaaaaa'}, dispatched: false}
{streamId: '2', streamRevision: 0, commitId: 0, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 1, commitId: 1, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 2, commitId: 2, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '2', streamRevision: 3, commitId: 3, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}, dispatched: false}
],
function (err) {
storage.addEvents([
{streamId: '3', streamRevision: 0, commitId: 4, payload: {event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '3', streamRevision: 1, commitId: 5, payload: {event:'blaaaaaaaaaaa'}, dispatched: false}
{streamId: '3', streamRevision: 0, commitId: 4, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '3', streamRevision: 1, commitId: 5, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}, dispatched: false}
],
function (err) {
storage.addSnapshot({snapshotId: '1', streamId: '3', revision: 1, data: 'data'}, function() {
Expand Down
76 changes: 76 additions & 0 deletions storage/redis/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,82 @@ Storage.prototype = {
});
},

// __getEventRangeMatching:__ loads the range of events from given storage.
//
// `storage.getEventRangeMatching(match, amount, callback)`
//
// - __match:__ match query in inner event (payload)
// - __amount:__ amount of events
// - __callback:__ `function(err, events){}`
getEventRangeMatching: function(match, amount, callback) {
var self = this;

this.client.keys(this.options.eventsCollectionName + ':*', function (err, res) {
if (err) {
callback(err);
} else {
var arr = [];

if (res.length === 0) {
callback(null, arr);
} else {
var last = res[res.length - 1];
res.forEach(function(key) {
self.client.lrange(key, 0, -1, function (err, res) {
if (err) {
callback(err);
} else {
res.forEach(function(item) {
arr.push(JSON.parse(item));
});
}

if (key == last) {

arr.sort(function(a, b){
return a.commitStamp - b.commitStamp;
});

var index = 0;

if (match) {
for (var m in match) {
if (match.hasOwnProperty(m)) {

for (var len = arr.length; index < len; index++) {
var evt = arr[index];

if (evt.payload[m] === match[m]) {
break;
}
}

break;
}
}
}

if (arr.length > index + 1) {

var endIndex = 0;
if (arr.length > index + 1 + amount) {
endIndex = index + 1 + amount;
} else if (arr.length <= index + 1 + amount) {
endIndex = arr.length - 1;
}

arr = arr.slice(index + 1, endIndex);
}

callback(null, arr);
}
});
});
}
}
});
},

// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
Expand Down
Loading

0 comments on commit 6dc19fc

Please sign in to comment.