Permalink
Browse files

elasticsearch support WIP

  • Loading branch information...
adrai committed Jul 12, 2015
1 parent 0ccb393 commit fc3b96a06b74b9cc4f368cd33d501387b3b6544d
Showing with 380 additions and 3 deletions.
  1. +9 −0 .editorconfig
  2. +1 −0 .travis.yml
  3. +365 −0 lib/databases/elasticsearch.js
  4. +1 −0 package.json
  5. +1 −1 test/repositoryReadTest.js
  6. +3 −2 test/repositoryWriteTest.js
View
@@ -0,0 +1,9 @@
# EditorConfig is awesome: http://EditorConfig.org
root = true
[*.{js,jsx,json}]
end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 2
View
@@ -2,6 +2,7 @@ services:
- mongodb
- couchdb
- redis-server
- elasticsearch
language: node_js
node_js:
@@ -0,0 +1,365 @@
'use strict';
var util = require('util'),
Repository = require('../base'),
ViewModel = Repository.ViewModel,
ConcurrencyError = require('../concurrencyError'),
elasticsearch = require('elasticsearch'),
uuid = require('node-uuid').v4,
jsondate = require('jsondate'),
_ = require('lodash'),
async = require('async'),
collections = [];
function Elasticsearch (options) {
Repository.call(this, options);
var defaults = {
index: 'context',
prefix: '',
ttl: 60 * 60 * 24 * 14, // 14 days
pingInterval: 1000
};
_.defaults(options, defaults);
if (!options.hosts && !options.host) {
options.host = 'localhost:9200';
}
this.options = options;
this.index = this.options.index;
}
util.inherits(Elasticsearch, Repository);
_.extend(Elasticsearch.prototype, {
connect: function (callback) {
var self = this;
this.isConnected = false;
this.client = new elasticsearch.Client(this.options);
var callbacked = false;
this.closeCalled = false;
var interval = setInterval(function () {
if (self.closeCalled) {
clearInterval(interval);
}
self.client.ping(function (err) {
if (err) {
if (self.isConnected) {
self.isConnected = false;
self.emit('disconnect');
}
if (callback && !callbacked) {
callbacked = true;
callback(err, self);
}
return;
}
if (!self.isConnected && !callbacked) {
self.isConnected = true;
self.emit('connect');
if (callback) {
callbacked = true;
callback(err, self);
}
}
});
}, this.options.pingInterval);
},
disconnect: function (callback) {
this.closeCalled = true;
if (this.client) this.client.close();
if (callback) callback(null);
},
getNewId: function (callback) {
var id = uuid().toString();
if (callback) callback(null, id);
},
get: function(id, callback) {
if (_.isFunction(id)) {
callback = id;
id = null;
}
if (!id) {
id = uuid().toString();
}
var self = this;
this.checkConnection(function (err) {
if (err) {
return callback(err);
}
self.client.get({
index: self.index,
type: self.collectionName,
id: self.options.prefix + id
}, function (err, res) {
if (err && err.message.toLowerCase().indexOf('not found') >= 0) {
err = null;
}
if (err) return callback(err);
if (res && res._source) {
var data = jsondate.parse(JSON.stringify(res._source));
var vm = new ViewModel(data, self);
vm.actionOnCommit = 'update';
return callback(null, vm);
}
callback(null, new ViewModel({ id: id }, self));
});
});
},
find: function(query, queryOptions, callback) {
var self = this;
this.checkConnection(function (err) {
if (err) {
return callback(err);
}
self.client.search({
index: self.index,
type: self.collectionName,
body: {
from: 0,
size: 2147483647,
query: {
match_all: {}
}
}
}, function (err, res) {
if (err) {
return callback(err);
}
// Map to view models
var vms = _.map(res.hits.hits, function(data) {
var vm = new ViewModel(data._source, self);
vm.actionOnCommit = 'update';
return vm;
});
callback(err, vms);
});
});
},
findOne: function(query, queryOptions, callback) {
queryOptions.limit = 1;
this.find(query, queryOptions, function (err, vms) {
if (err) {
return callback(err);
}
if (vms.length === 0) {
return callback(null, null);
}
callback(null, vms[0]);
});
},
commit: function(vm, callback) {
if (!vm.actionOnCommit) return callback(new Error());
var obj;
var self = this;
this.checkConnection(function (err) {
if (err) {
return callback(err);
}
switch(vm.actionOnCommit) {
case 'delete':
self.client.delete({
index: self.index,
type: self.collectionName,
consistency: 'quorum',
id: self.options.prefix + vm.id
}, function (err, res) {
if (err && err.message.toLowerCase().indexOf('not found') >= 0) {
err = null;
}
if (callback) callback(err);
});
break;
case 'create':
vm.set('_version', 1);
obj = vm.toJSON();
self.client.create({
index: self.index,
type: self.collectionName,
id: self.options.prefix + vm.id,
consistency: 'quorum',
body: obj
}, function (err, res) {
console.log(err);
/*if (err && err.message && err.message.indexOf('duplicate key') >= 0) {
return callback(new ConcurrencyError());
}*/
vm.actionOnCommit = 'update';
callback(err, vm);
});
break;
case 'update':
var nextVersion = vm.get('_version') + 1;
vm.set('_version', nextVersion);
obj = vm.toJSON();
self.client.index({
index: self.index,
type: self.collectionName,
id: self.options.prefix + vm.id,
version: nextVersion - 1,
consistency: 'quorum',
body: obj
}, function (err, res) {
console.log(err);
/*if (modifiedCount && modifiedCount.result && modifiedCount.result.n === 0) {
return callback(new ConcurrencyError());
}*/
vm.actionOnCommit = 'update';
callback(err, vm);
});
break;
default:
return callback(new Error());
}
});
},
checkConnection: function (callback) {
if (this.isInited) {
return callback(null);
}
if (!this.collectionName) {
return callback(null);
}
this.isInited = true;
if (collections.indexOf(this.collectionName) < 0) {
collections.push(this.collectionName)
}
var self = this;
this.client.indices.create({
index: this.index,
type: this.collectionName,
consistency: 'quorum'
}, function(err) {
if (err && err.message.toLowerCase().indexOf('already') >= 0) {
err = null;
}
if (err) {
return callback(err);
}
self.client.indices.putMapping({
index: self.index,
type: self.collectionName,
consistency: 'quorum',
body: {
dynamic_templates: [
{
non_analyzed_string: {
match: '*',
match_mapping_type: 'string',
'mapping': {
'type': 'string',
'index': 'not_analyzed'
}
}
}
]
}
}, callback);
});
},
clear: function (callback) {
if (!this.collectionName) {
if (callback) callback(null);
return;
}
var self = this;
this.checkConnection(function (err) {
if (err) {
if (callback) callback(err);
return;
}
self.client.deleteByQuery({
index: self.index,
type: self.collectionName,
consistency: 'quorum',
body: {
query: {
bool: {
must: [
{
match_all: {}
}
]
}
}
}
}, callback || function () {});
});
},
clearAll: function (callback) {
var self = this;
async.each(collections, function (col, callback) {
self.client.deleteByQuery({
index: self.index,
type: col,
consistency: 'quorum',
body: {
query: {
bool: {
must: [
{
match_all: {}
}
]
}
}
}
}, callback);
}, callback || function () {});
}
});
module.exports = Elasticsearch;
View
@@ -24,6 +24,7 @@
"cradle": ">=0.6.7",
"documentdb": ">=0.9.3",
"doqmentdb": ">=0.2.7",
"elasticsearch": ">=5.0.0",
"expect.js": ">= 0.1.2",
"mocha": ">= 1.0.1",
"mongodb": ">= 0.0.1",
Oops, something went wrong.

0 comments on commit fc3b96a

Please sign in to comment.