/
mongoDbStorage.js
121 lines (105 loc) · 3.61 KB
/
mongoDbStorage.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
var mongo = require('mongodb')
, ObjectID = mongo.BSONPure.ObjectID
, async = require('async');
var root = this;
var MongoDbStorage;
if (typeof exports !== 'undefined') {
MongoDbStorage = exports;
} else {
MongoDbStorage = root.MongoDbStorage = {};
}
MongoDbStorage.VERSION = '0.0.1';
// create new instance of storage
MongoDbStorage.createStorage = function(options) {
return new Storage(options);
};
/*******************************************
* Storage
*/
var Storage = function(options) {
// set options and load defaults if needed
this.options = options || {};
this.options.host = this.options.host || 'localhost';
this.options.port = this.options.port || mongo.Connection.DEFAULT_PORT;
this.collectionName = 'events';
this.store = new mongo.Db('eventstore'
, new mongo.Server(this.options.host, this.options.port, {}), {});
this.store.addListener("error", function(error) {
console.log("Error connecting to mongo -- perhaps it isn't running?");
});
this.client = undefined;
this.store.open(function(err, client) {
this.client = client;
}.bind(this));
};
Storage.prototype.addEvent = function(event, clb) {
var client = this.client;
var collectionName = this.collectionName;
async.waterfall([
function(callback){
client.collection(collectionName, function(err, collection) {
if (err) {
console.log('Error: '+err);
}
event._id = event.commitId;
event.dispatched = false;
callback(null, collection);
});
},
function(collection, callback){
collection.insert(event, function(err, doc) {
if (err) {
console.log('Error: '+err);
}
else {
clb();
}
});
}
]);
};
Storage.prototype.getEvents = function(streamId, minRev, maxRev) {
this.client.collection(this.collectionName, function(err, collection) {
var options = {'$gt':minRev, '$lte':maxRev};
if (maxRev == -1) {
options = {'$gt':minRev};
}
collection.find({'streamId' : streamId, 'streamRevision': options}, {sort:[['streamRevision','desc']]}, function(err, cursor) {
cursor.toArray(function(err, events) {
if (events.length === 0) {
return undefined;
} else {
return events;
}
});
});
});
};
Storage.prototype.getUndispatchedEvents = function() {
this.client.collection(this.collectionName, function(err, collection) {
collection.find({'dispatched' : false}, {sort:[['streamRevision','desc']]}, function(err, cursor) {
cursor.toArray(function(err, events) {
if (events.length === 0) {
return undefined;
} else {
return events;
}
});
});
});
};
Storage.prototype.setEventToDispatched = function(evt) {
this.client.collection(this.collectionName, function(err, collection) {
var updateCommand = { '$set' : {'dispatched': true} };
collection.update({'_id' : evt.commitId}, updateCommand, function(err, doc) {
if (err) {
console.log('Error: '+err);
}
});
});
};
Storage.prototype.getId = function(callback) {
if (typeof callback === 'function') {
callback(new ObjectID());
}
};