Skip to content

Commit

Permalink
Added basic implementation from other modules
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 17, 2017
1 parent 0d4e2a3 commit 36733d9
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 0 deletions.
9 changes: 9 additions & 0 deletions config/defaults.cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module.exports = {
defaultType: 'memory',
memory: {
logger: 'cache'
},
file: {
logger: 'cache'
}
};
103 changes: 103 additions & 0 deletions elasticsearch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const config = require('cheevr-config');
const Database = require('cheevr-database');

class ElasticsearchCache {
constructor(config, logger) {
this._db = Database.factory(config.database);
this._log = logger;
this._config = config;
}

store(queue, payload, cb) {
if (!payload.id) {
throw new Error('Unable to cache message queue payload because of missing id');
}
this._db.indices.existsType({
index: this._config.index,
type: queue
}, err => {
this._createMapping(err && queue, err => {
if (err) {
return cb && cb(err);
}
this._db.index({
index: this._config.type,
type: queue,
id: payload.id,
body: {
date: Date.now(),
content: payload
}
}, err => cb && cb(err));
});
});
}

_createMapping(type, cb) {
if (!type) {
return cb();
}
this._db.indices.putMapping({
index: this._config.index,
type,
body: {
properties: {
date: { type:'date' },
content: { dynamic: true, enabled: false }
}
}
}, cb);
}

get(queue, cb) {
this._db.search({
index: this._config.index,
type: queue,
body: {
query: {
match_all: {}
},
sort: [{
date: { order: 'desc' }
}]
}
}, (err, results) => {
if (err) {
return cb && cb([]);
}
let response = [];
for (let hit of results.hits.hits) {
response.push(hit._source);
}
cb && cb(response);
});
}

clear(queue, cb) {
this._db.deleteByQuery({
index: this._config.index,
type: queue,
body: {
query: {
match_all: {}
}
}
}, err => {
err && this._log.warn('Error trying to clear queue %s from elasticsearch', queue, err);
cb && cb();
});
}

remove(queue, id, cb) {
this._db.delete({
index: this._config.index,
type: queue,
id
}, err => {
err && this._log.warn('Error trying to remove an entry from elasticsearch stored queue %s and id %s:', queue, id, err);
cb && cb();
});
}
}

module.exports = ElasticsearchCache;
112 changes: 112 additions & 0 deletions file.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
const fs = require('fs');
const path = require('path');


/**
* @typedef {Object} FileCacheConfig
* @extends CacheConfig
* @property {string} [path=queues] The directory in which to store queue information
*/

const cwd = process.cwd();

/**
* This cache implementation will store queues and messages on disk in a given directory. Queues are mapped to
* directories and message are stored as files. All data is also cached in memory, so that file read access only
* happens after a reboot through lazy initialization.
*/
class FileCache {
/**
* @param {FileCacheConfig} config
*/
constructor(config) {
this._path = FileCache._mkDir(config && config.path || 'queues');
this._queues = {};
}

/**
* Recursively creates a path if doesn't exist yet.
* @param {string} dir An absolute path to be created
* @private
*/
static _mkDir(dir) {
dir = path.isAbsolute(dir) ? dir : path.join(cwd, dir);
let fullPath = '';
for (let entry of dir.split(path.sep)) {
fullPath += '/' + entry;
fs.existsSync(fullPath) || fs.mkdirSync(fullPath);
}
return dir;
}

/**
* Stores the payload in cache.
* @param {string} type The type/group to cache for
* @param {string} id The id of the document to store
* @param {object} payload The data to cache
* @param {Callback} [cb] Callback to be notified on async store operations
*/
store(type, id, payload, cb) {
this._queues[type][id] = payload;
let fullPath = path.join(this._path, type);
fs.existsSync(fullPath) || fs.mkdirSync(fullPath);
fs.writeFile(path.join(fullPath, id), JSON.stringify(payload), 'utf8', err => cb && cb(err));
}

/**
* Returns all cached messages and listeners
* @param {string} type The id/name of the type for which to fetch data
* @param {Callback} [cb] Callback function for async fetching
* @returns {Object<string, Object>} A map with message id's mapping to payloads
*/
get(type, cb) {
if (this._queues[type]) {
cb(null, this._queues[type]);
return this._queues[type];
}
let fullPath = path.join(this._path, type);
let files = fs.readdirSync(fullPath);
let response = {};
for (let file of files) {
let payload = JSON.parse(fs.readFileSync(path.join(fullPath, file), 'utf8'));
response[payload.id] = payload;
}
cb && cb(null, response);
return response;
}

/**
* Removes all cached data from a type
* @param {string} type The id/name of the type to clear
* @param {Callback} [cb] Callback to be notified on async clear operations
*/
clear(type, cb) {
let fullPath = path.join(this._path, type);
fs.existsSync(fullPath) && fs.unlinkSync(fullPath);
delete this._queues[type];
cb && cb();
}

/**
* Remove an entry from cache.
* @param {string} type The id/name of the type from which to remove the message
* @param {string} id The id of the message to remove
* @param {Callback} [cb] Callback to be notified on async remove operations
*/
remove(type, id, cb) {
let fullPath = path.join(this._path, type, id);
fs.existsSync(fullPath) && fs.unlinkSync(fullPath);
delete this._queues[type][id];
cb && cb();
}

/**
* Returns the path under which all queues are being cached
* @returns {string}
*/
get path() {
return this._path;
}
}

module.exports = FileCache;
26 changes: 26 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
let config = require('cheevr-config');


/**
* @typedef {function} Callback
* @param {Error|string|null} [err]
* @param {...*} params
*/

/**
* @typedef {Object} CacheConfig
* @property {string} type The cache type that should be used to store data. Maps directly to the file names of the caches
*/

class Cache {
/**
* Returns a cache implementation of the given type.
* @param {string} [name=_default_]
*/
get(name = '_default_') {
let instanceConfig = config.cache[name] || { type: config.defaults.cache.defaultType };
return new (require('./' + instanceConfig.type))(instanceConfig);
}
}

module.exports = new Cache();
53 changes: 53 additions & 0 deletions memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
class MemoryCache {
constructor() {
this._queues = {};
}

/**
* Stores the payload in cache.
* @param {string} type The name of the type to cache for
* @param {string} id The id of the document to store
* @param {object} payload The data to cache
* @param {string} payload.id The id that is being used to reference the message later on
* @param {Callback} [cb] Callback to be notified on async store operations
*/
store(type, id, payload, cb) {
this._queues[type] = this._queues[type] || {};
this._queues[type][id] = payload;
cb && cb();
}

/**
* Returns all cached messages and listeners
* @param {string} type The id/name of the type for which to fetch data
* @param {Callback} [cb] Callback function for async fetching
* @returns {Object<string, Object>} A map with message id's mapping to payloads
*/
get(type, cb) {
cb && cb(null, this._queues[type]);
return this._queues[type];
}

/**
* Removes all cached data from a type
* @param {string} type The id/name of the type to clear
* @param {Callback} [cb] Callback to be notified on async clear operations
*/
clear(type, cb) {
delete this._queues[type];
cb && cb();
}

/**
* Remove an entry from cache.
* @param {string} type The id/name of the type from which to remove the message
* @param {string} id The id of the message to remove
* @param {Callback} [cb] Callback to be notified on async remove operations
*/
remove(type, id, cb) {
delete this._queues[type][id];
cb && cb();
}
}

module.exports = MemoryCache;

0 comments on commit 36733d9

Please sign in to comment.