Skip to content

Commit

Permalink
Mongodb adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
1602 committed Mar 11, 2012
1 parent 310c3a7 commit 8bb855c
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 108 deletions.
165 changes: 165 additions & 0 deletions lib/adapters/mongodb.js
@@ -0,0 +1,165 @@
var safeRequire = require('../utils').safeRequire;

/**
* Module dependencies
*/
var mongodb = safeRequire('mongodb');
var ObjectID = mongodb.ObjectID;

exports.initialize = function initializeSchema(schema, callback) {
if (!mongodb) return;

var s = schema.settings;

if (schema.settings.url) {
var url = require('url').parse(schema.settings.url);
s.host = url.hostname;
s.port = url.port;
s.database = url.path.replace(/^\//, '');
s.username = url.auth && url.auth.split(':')[0];
s.password = url.auth && url.auth.split(':')[1];
}

s.host = s.host || 'localhost';
s.port = parseInt(s.port || '27017', 10);
s.database = s.database || 'test';

var server = new mongodb.Server(s.host, s.port, {});
new mongodb.Db(s.database, server, {}).open(function (err, client) {
if (err) throw err;
schema.client = client;
schema.adapter = new MongoDB(client);
callback();
});
};

function MongoDB(client) {
this._models = {};
this.client = client;
this.collections = {};
}

MongoDB.prototype.define = function (descr) {
if (!descr.settings) descr.settings = {};
this._models[descr.model.modelName] = descr;
};

MongoDB.prototype.defineProperty = function (model, prop, params) {
this._models[model].properties[prop] = params;
};

MongoDB.prototype.collection = function (name) {
if (!this.collections[name]) {
this.collections[name] = new mongodb.Collection(this.client, name);
}
return this.collections[name];
};

MongoDB.prototype.create = function (model, data, callback) {
this.collection(model).insert(data, {}, function (err, m) {
callback(err, err ? null : m[0]._id.toString());
});
};

MongoDB.prototype.save = function (model, data, callback) {
this.collection(model).save({_id: new ObjectID(data.id)}, data, function (err) {
callback(err);
});
};

MongoDB.prototype.exists = function (model, id, callback) {
this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) {
callback(err, !err && data);
});
};

MongoDB.prototype.find = function find(model, id, callback) {
this.collection(model).findOne({_id: new ObjectID(id)}, function (err, data) {
if (data) data.id = id;
callback(err, data);
});
};

MongoDB.prototype.destroy = function destroy(model, id, callback) {
this.collection(model).remove({_id: new ObjectID(id)}, callback);
};

MongoDB.prototype.all = function all(model, filter, callback) {
if (!filter) {
filter = {};
}
var query = {};
if (filter.where) {
Object.keys(filter.where).forEach(function (k) {
var cond = filter.where[k];
var spec = false;
if (cond && cond.constructor.name === 'Object') {
spec = Object.keys(cond)[0];
cond = cond[spec];
}
if (spec) {
if (spec === 'between') {
query[k] = { $gte: cond[0], $lte: cond[1]};
} else {
query[k] = {};
query[k]['$' + spec] = cond;
}
} else {
if (cond === null) {
query[k] = {$type: 10};
} else {
query[k] = cond;
}
}
});
}
var cursor = this.collection(model).find(query);

if (filter.order) {
var m = filter.order.match(/\s+(A|DE)SC$/);
var key = filter.order;
var reverse = false;
if (m) {
key = key.replace(/\s+(A|DE)SC$/, '');
if (m[1] === 'DE') reverse = true;
}
if (reverse) {
cursor.sort([[key, 'desc']]);
} else {
cursor.sort(key);
}
}
if (filter.limit) {
cursor.limit(filter.limit);
}
if (filter.skip) {
cursor.skip(filter.skip);
} else if (filter.offset) {
cursor.skip(filter.offset);
}
cursor.toArray(function (err, data) {
if (err) return callback(err);
callback(null, data.map(function (o) { o.id = o._id.toString(); delete o._id; return o; }));
});
};

MongoDB.prototype.destroyAll = function destroyAll(model, callback) {
this.collection(model).remove({}, callback);
};

MongoDB.prototype.count = function count(model, callback, where) {
this.collection(model).count(where, function (err, count) {
callback(err, count);
});
};

MongoDB.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
this.collection(model).findAndModify({_id: new ObjectID(id)}, [['_id','asc']], {$set: data}, {}, function(err, object) {
cb(err, object);
});
};

MongoDB.prototype.disconnect = function () {
this.client.close();
};

2 changes: 1 addition & 1 deletion lib/adapters/mysql.js
Expand Up @@ -36,7 +36,7 @@ MySQL.prototype.query = function (sql, callback) {
var log = this.log;
if (typeof callback !== 'function') throw new Error('callback should be a function');
this.client.query(sql, function (err, data) {
log(sql, time);
if (log) log(sql, time);
callback(err, data);
});
};
Expand Down
22 changes: 14 additions & 8 deletions lib/adapters/postgres.js
Expand Up @@ -20,14 +20,9 @@ exports.initialize = function initializeSchema(schema, callback) {
debug: s.debug
});
schema.adapter = new PG(schema.client);
schema.client.connect(function(err){
if(!err){
process.nextTick(callback);
}else{
console.error(err);
throw err;
}
});
if (s.autoconnect === false) return callback();

schema.adapter.connect(callback);
};

function PG(client) {
Expand All @@ -37,6 +32,17 @@ function PG(client) {

require('util').inherits(PG, BaseSQL);

PG.prototype.connect = function (callback) {
this.client.connect(function (err) {
if (!err){
callback();
}else{
console.error(err);
throw err;
}
});
};

PG.prototype.query = function (sql, callback) {
var time = Date.now();
var log = this.log;
Expand Down
132 changes: 58 additions & 74 deletions lib/adapters/riak.js
@@ -1,126 +1,110 @@
var safeRequire = require('../utils').safeRequire;

/**
* Module dependencies
*/
var riak= require('riak-js');
var uuid = require('node-uuid');
var riak = safeRequire('riak-js');

exports.initialize = function initializeSchema(schema, callback) {
var config = {
host = schema.settings.host || '127.0.0.1',
port = schema.settings.port || 8098
};

schema.client = riak_lib.getClient(config);
schema.adapter = new BridgeToRedis(schema.client);
schema.client = riak.getClient({
host: schema.settings.host || '127.0.0.1',
port: schema.settings.port || 8091
});
schema.adapter = new Riak(schema.client);
};

function BridgeToRedis(client) {
function Riak(client) {
this._models = {};
this.client = client;
}

BridgeToRedis.prototype.define = function (descr) {
Riak.prototype.define = function (descr) {
this._models[descr.model.modelName] = descr;
};

BridgeToRedis.prototype.save = function (model, data, callback) {
this.client.hmset(model + ':' + data.id, data, callback);
Riak.prototype.save = function (model, data, callback) {
this.client.save(model, data.id, data, callback);
};

BridgeToRedis.prototype.create = function (model, data, callback) {
this.client.incr(model + ':id', function (err, id) {
data.id = id;
this.save(model, data, function (err) {
if (callback) {
callback(err, id);
}
});
}.bind(this));
Riak.prototype.create = function (model, data, callback) {
data.id = uuid();
this.save(model, data, function (err) {
if (callback) {
callback(err, data.id);
}
});
};

BridgeToRedis.prototype.exists = function (model, id, callback) {
this.client.exists(model + ':' + id, function (err, exists) {
Riak.prototype.exists = function (model, id, callback) {
this.client.exists(model, id, function (err, exists, meta) {
if (callback) {
callback(err, exists);
}
});
};

BridgeToRedis.prototype.find = function find(model, id, callback) {
this.client.hgetall(model + ':' + id, function (err, data) {
Riak.prototype.find = function find(model, id, callback) {
this.client.get(model, id, function (err, data, meta) {
if (data && data.id) {
data.id = id;
} else {
data = null;
}
callback(err, data);
if (typeof callback === 'function') callback(err, data);
});
};

BridgeToRedis.prototype.destroy = function destroy(model, id, callback) {
this.client.del(model + ':' + id, function (err) {
Riak.prototype.destroy = function destroy(model, id, callback) {
this.client.remove(model, id, function (err) {
callback(err);
});
};

BridgeToRedis.prototype.all = function all(model, filter, callback) {
this.client.keys(model + ':*', function (err, keys) {
if (err) {
return callback(err, []);
}
var query = keys.map(function (key) {
return ['hgetall', key];
});
this.client.multi(query).exec(function (err, replies) {
callback(err, filter ? replies.filter(applyFilter(filter)) : replies);
Riak.prototype.all = function all(model, filter, callback) {
var opts = {};
if (filter && filter.where) opts.where = filter.where;
this.client.getAll(model, function (err, result, meta) {
if (err) return callback(err, []);
/// return callback(err, result.map(function (x) { return {id: x}; }));
result = (result || []).map(function (row) {
var record = row.data;
record.id = row.meta.key;
console.log(record);
return record;
});

return callback(err, result);
}.bind(this));
};

function applyFilter(filter) {
if (typeof filter === 'function') {
return filter;
}
var keys = Object.keys(filter);
return function (obj) {
var pass = true;
keys.forEach(function (key) {
if (!test(filter[key], obj[key])) {
pass = false;
}
});
return pass;
}
Riak.prototype.destroyAll = function destroyAll(model, callback) {
var self = this;
this.all(model, {}, function (err, recs) {
if (err) callback(err);

function test(example, value) {
if (typeof value === 'string' && example && example.constructor.name === 'RegExp') {
return value.match(example);
}
// not strict equality
return example == value;
}
}
removeOne();

BridgeToRedis.prototype.destroyAll = function destroyAll(model, callback) {
this.client.keys(model + ':*', function (err, keys) {
if (err) {
return callback(err, []);
function removeOne(error) {
err = err || error;
var rec = recs.pop();
if (!rec) return callback(err && err.statusCode != '404' ? err : null);
console.log(rec.id);
self.client.remove(model, rec.id, removeOne);
}
var query = keys.map(function (key) {
return ['del', key];
});
this.client.multi(query).exec(function (err, replies) {
callback(err);
});
}.bind(this));

});

};

BridgeToRedis.prototype.count = function count(model, callback) {
Riak.prototype.count = function count(model, callback) {
this.client.keys(model + ':*', function (err, keys) {
callback(err, err ? null : keys.length);
});
};

BridgeToRedis.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
this.client.hmset(model + ':' + id, data, cb);
Riak.prototype.updateAttributes = function updateAttrs(model, id, data, cb) {
data.id = id;
this.save(model, data, cb);
};

0 comments on commit 8bb855c

Please sign in to comment.