Permalink
Browse files

misc:fix bug

  • Loading branch information...
1 parent d87a5c4 commit 2335b7fdf874ff4c5cef03f072448dfd895cb4c3 yaopiaohai committed Jan 21, 2013
Showing with 145 additions and 131 deletions.
  1. +1 −1 LICENSE
  2. +4 −5 README.md
  3. +24 −19 lib/commands/mapping.js
  4. +43 −41 lib/commands/server.js
  5. +19 −19 lib/dbsync.js
  6. +43 −43 lib/rewriter/rewriter.js
  7. +2 −2 package.json
  8. +9 −1 test/sync.js
View
@@ -1,6 +1,6 @@
(The MIT License)
-Copyright (c) 2012 Netease, Inc. and other pomelo contributors
+Copyright (c) 2012 NetEase, Inc. and other pomelo contributors
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
View
@@ -14,15 +14,13 @@ Data sync features include timer sync,set,get,mset,mget,hset,hget,incr,decr,flus
##Installation
```
-npm install data-sync
+npm install pomelo-sync
```
##Usage
``` javascript
-var DBsync = require('dbsync');
-
-var opt = opts || {};
+var opt = opt || {};
var updateUser = function(dbclient,val) {
console.log('mock save %j',val);
@@ -36,7 +34,8 @@ opt.mapping = mapping;
opt.client = dbclient;
opt.interval = 2000;
-var sync = new DBsync(options) ;
+var Sync = require('pomelo-sync');
+var sync = new Sync(opt) ;
sync.exec(optKey,id,{name:'hello'});
```
View
@@ -8,28 +8,33 @@ var path = require('path');
* @param {String} mappingPath
* @return {Object} mapping
*/
-exports.loadMapping = function(mappingPath) {
+ exports.loadMapping = function(mappingPath) {
var mapping = {};
- var logger = this.log;
- mappingPath+='/';
- logger.info('[data sync compoment] load mapping file ' + mappingPath);
+ var logger = this.log;
+ var self = this;
+ mappingPath+='/';
+ if (!!self.debug) {
+ logger.info('[data sync compoment] load mapping file ' + mappingPath);
+ }
fs.readdirSync(mappingPath).forEach(function(filename){
- if (!/\.js$/.test(filename)) {return;}
- var name = path.basename(filename, '.js'),key,pro;
- var fullPath = mappingPath + name;
- logger.info('loading ' + fullPath);
- pro = require(fullPath);
- for (key in pro){
- var fullKey = name+'.'+key;
- if (mapping[fullKey]){
- logger.error('[data sync component] exist duplicated key map function ' + key + ' ignore it now.');
- } else {
- mapping[fullKey] = pro[key];
- }
- }
- });
+ if (!/\.js$/.test(filename)) {return;}
+ var name = path.basename(filename, '.js'),key,pro;
+ var fullPath = mappingPath + name;
+ if (!!self.debug) {
+ logger.log('loading ' + fullPath);
+ }
+ pro = require(fullPath);
+ for (key in pro){
+ var fullKey = name+'.'+key;
+ if (mapping[fullKey]){
+ logger.error('[data sync component] exist duplicated key map function ' + key + ' ignore it now.');
+ } else {
+ mapping[fullKey] = pro[key];
+ }
+}
+});
logger.info('[data sync component] load mapping file done.' );
- return mapping;
+ return mapping;
};
View
@@ -1,40 +1,42 @@
/**
* Module dependencies.
*/
-
-var utils = require('../utils/utils');
-var invoke = utils.invoke;
-var clone = utils.clone;
-
+ var utils = require('../utils/utils');
+ var invoke = utils.invoke;
+ var clone = utils.clone;
/**
*
- * load data from db
+ * invoke tick instant
+ *
+ * @module
*
* @param {String} key
* @param {Object} val
* @param {Function} cb
*
*/
-exports.execSync = function(key,val,cb){
- this.rewriter.tick(key,val,cb);
-};
+ exports.execSync = function(key,val,cb){
+ this.rewriter.tick(key,val,cb);
+ };
/**
- * flush data to db
- *
+ * exec add be synced data to queue
+ * invoke by timer
+ *
+ * @module
*/
-exports.exec = function(){
+ exports.exec = function(){
var mergerKey;
switch (arguments.length) {
- case 2:
+ case 2:
this.enqueue(arguments[0],arguments[1]);
break;
- case 3:
+ case 3:
mergerKey = [arguments[0],arguments[1]].join('');
this.mergerMap[mergerKey] = {key:arguments[0],val:clone(arguments[2])};
this.writeToAOF(arguments[0], [arguments[2]]);
break;
- default:
+ default:
break;
}
};
@@ -47,44 +49,44 @@ exports.exec = function(){
* @param {Object} val
*
*/
-exports.enqueue = function(key, val){
- var target = clone(val);
- if (!!target) {
- this.writeToAOF(key, [val]);
- this.flushQueue.push({key:key,val:val});
- }
+ exports.enqueue = function(key, val){
+ var target = clone(val);
+ if (!!target) {
+ this.writeToAOF(key, [val]);
+ this.flushQueue.push({key:key,val:val});
+ }
};
/**
* flush all data go head
*/
-exports.sync = function(){
- if (this.rewriter) {
- this.rewriter.sync(this);
- }
+ exports.sync = function(){
+ if (this.rewriter) {
+ this.rewriter.sync(this);
+ }
};
/**
- * reutrn job is done
- *
+ * reutrn queue is empty or not when shutdown server
*
+ * @module
*
*/
-exports.isDone = function(){
+ exports.isDone = function(){
var writerEmpty = true,queueEmpty = false,mapEmpty = false;
if (!!this.rewriter) {
- writerEmpty = this.rewriter.isDone();
- }
- queueEmpty = (this.flushQueue.getLength()===0);
- mapEmpty = (utils.getMapLength(this.mergerMap)===0);
- return writerEmpty && queueEmpty && mapEmpty;
+ writerEmpty = this.rewriter.isDone();
+ }
+ queueEmpty = (this.flushQueue.getLength()===0);
+ mapEmpty = (utils.getMapLength(this.mergerMap)===0);
+ return writerEmpty && queueEmpty && mapEmpty;
};
/*
*
* flush single data to db
* first remove from cache map
*/
-exports.flush = function(){
+ exports.flush = function(){
var mergerKey;
if (arguments.length>=3) {
mergerKey = [arguments[0],arguments[1]].join('');
@@ -94,7 +96,7 @@ exports.flush = function(){
delete this.mergerMap[mergerKey];
}
this.writeToAOF(arguments[0], [arguments[2]]);
- return this.rewriter.flush(this,arguments[0],arguments[2]);
+ return this.rewriter.flush(arguments[0],arguments[2]);
} else {
this.log.error('invaild arguments,flush must have at least 3 arguments');
return false;
@@ -106,18 +108,18 @@ exports.flush = function(){
*
*
*/
-exports.info = function(){
+ exports.info = function(){
var buf = ''
- , day = 86400000
- , uptime = new Date - this.server.start;
+ , day = 86400000
+ , uptime = new Date - this.server.start;
this.dbs.forEach(function(db, i){
var keys = Object.keys(db)
- , len = keys.length;
+ , len = keys.length;
if (len) {
buf += 'db' + i + ':keys=' + len + ',expires=0\r\n';
- }
- });
+ }
+});
return (buf);
};
View
@@ -1,35 +1,34 @@
/**
* Module dependencies.
*/
-
-var commands = require('./commands'),utils = require('./utils/utils'),Queue = require('./utils/queue'),fs = require('fs');
-
+var commands = require('./commands');
+var utils = require('./utils/utils');
+var Queue = require('./utils/queue');
+var fs = require('fs');
var crypto = require('crypto');
var Rewriter = require('../lib/rewriter/rewriter');
var SyncTimer = require('../lib/timer/synctimer');
-var mutate = [];
-
-Object.keys(commands).forEach(function(cmd){
- var fn = commands[cmd];
- if (fn.mutates) {mutate.push(cmd);}
-});
-
/**
- * Initialize a new `DataSync` with the given `server` and `options`.
+ *
+ * DataSync Components.
*
- * Options:
+ * Initialize a new `DataSync` with the given `options`.
*
- * `filename` Append-only file path
- * @param {Object} options
+ * DataSync's prototype is based on `commands` under the same directory;
+ *
+ * @class DataSync
+ * @constructor
+ * @param {Object} options init params include aof,log,interval,mapping and mappingPath etc.
+ *
*/
-
-var DataSync = module.exports = function DataSync(options) {
+var DataSync = module.exports = function(options) {
options = options || {};
this.dbs = [];
this.selectDB(0);
this.client = options.client;
this.aof = options.aof || false;
+ this.debug = options.debug || false;
this.log = options.log || console;
this.interval = options.interval || 1000 * 60;
this.flushQueue = new Queue();
@@ -57,12 +56,11 @@ var DataSync = module.exports = function DataSync(options) {
/**
* Expose commands to store.
*/
-
DataSync.prototype = commands;
/**
* Select database at the given `index`.
- *
+ * @api private
* @param {Number} index
*/
@@ -79,7 +77,7 @@ DataSync.prototype.selectDB = function(index){
/**
*return the first used db
*
- *
+ * @api private
*/
DataSync.prototype.use = function() {
this.selectDB(0);
@@ -93,6 +91,7 @@ DataSync.prototype.use = function() {
* Lookup `key`, when volatile compare timestamps to
* expire the key.
*
+ * @api private
* @param {String} key
* @return {Object}
*/
@@ -109,6 +108,7 @@ DataSync.prototype.lookup = function(key){
/**
* Write the given `cmd`, and `args` to the AOF.
*
+ * @api private
* @param {String} cmd
* @param {Array} args
*/
Oops, something went wrong.

0 comments on commit 2335b7f

Please sign in to comment.