Skip to content

Commit

Permalink
fix: let scanner scan through regions (#101)
Browse files Browse the repository at this point in the history
* fix: let scanner scan through regions

* add test case

* add more test cases

* add more test case
  • Loading branch information
XadillaX authored and gxcsoccer committed Jan 2, 2018
1 parent 3f46ff3 commit 27886f2
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 8 deletions.
7 changes: 4 additions & 3 deletions lib/client.js
Expand Up @@ -177,11 +177,12 @@ Client.prototype.delete = function (tableName, del, callback) {


Client.prototype.getScanner = function (tableName, scan, callback) {
this._action('openScanner', tableName, scan, true, 0, function (err, scannerId, server) {
var self = this;
this._action('openScanner', tableName, scan, true, 0, function (err, scannerId, server, regionInfo) {
if (err) {
return callback(err);
}
var scanner = new Scanner(server, scannerId);
var scanner = new Scanner(server, scannerId, scan, regionInfo, self, tableName);
callback(null, scanner);
});
};
Expand Down Expand Up @@ -228,7 +229,7 @@ Client.prototype._action = function (method, tableName, obj, useCache, retry, ca
}
}

callback(err, value, server);
callback(err, value, server, location.getRegionInfo());
});

});
Expand Down
136 changes: 132 additions & 4 deletions lib/scanner.js
Expand Up @@ -13,22 +13,150 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util');

function Scanner(server, id) {
var async = require('async');
var Linklist = require('algorithmjs').ds.Linklist;

var Result = require('./result');
var Scan = require('./scan');

function Scanner(server, id, scan, region, client, tableName) {
EventEmitter.call(this);

this.server = server;
this.id = id;
this.scan = scan;

this.closed = false;
this.cache = new Linklist();

this.region = region;
this.client = client;
this.tableName = tableName;
}

util.inherits(Scanner, EventEmitter);

Scanner.prototype._openNextScanner = function(callback) {
// no more region
if (!this.region.endKey || !this.region.endKey.length) {
return callback(null, false);
}

// next region is bigger than scan.stopRow
if (this.scan.stopRow.length && this.region.endKey.compare(this.scan.stopRow) === 1) {
return callback(null, false);
}

var old = this.scan;
var scan = new Scan(this.region.endKey, old.stopRow);
scan.maxVersions = old.maxVersions;
scan.batch = old.batch;
scan.caching = old.caching;
scan.maxResultSize = old.maxResultSize;
scan.cacheBlocks = old.cacheBlocks;
scan.filter = old.filter;
scan.tr = old.tr;
scan.familyMap = old.familyMap;

var self = this;
this.client.getScanner(this.tableName, scan, function(err, scanner) {
if (err) return callback(err);

self.server = scanner.server;
self.id = scanner.id;
self.scan = scan;
self.region = scanner.region;

return callback(null, true);
});
};

Scanner.prototype._next = function(callback) {
if (!this.cache.length && this.closed) {
return callback(null, null);
}

if (this.cache.length) {
return callback(null, this.cache.popFront());
}

// cache from server
var self = this;
var caching = this.scan.caching < 1 ? 1 : this.scan.caching;
var countdown = caching;
var nextFinished = false;
var values = [];
async.whilst(function() {
return countdown > 0 && !nextFinished;
}, function(callback) {
self.server.nextResult(self.id, caching, function(err, rows) {
if (err) {
return callback(err);
}

if (rows.length) {
countdown -= rows.length;
values = Array.prototype.concat.apply(values, rows);
}

if (rows.length < caching && countdown > 0) {
return self._openNextScanner(function(err, opened) {
if (err) return callback(err);
nextFinished = !opened;
callback();
});
} else {
return callback();
}
});
}, function(err) {
if (err) return callback(err);
if (!values.length) return callback();

values.forEach(function(v) {
self.cache.pushBack(v);
});

return callback(null, self.cache.popFront());
});
};

Scanner.prototype.next = function (numberOfRows, callback) {
this.server.nextResult(this.id, numberOfRows, callback);
if (typeof numberOfRows === 'function') {
callback = numberOfRows;
numberOfRows = null;
}

if (!numberOfRows) {
return this._next(callback);
}

var self = this;
var noMore = false;
var ret = [];
async.whilst(function() {
return ret.length < numberOfRows && !noMore;
}, function(callback) {
self._next(function(err, v) {
if (err) return callback(err);
if (!v) {
noMore = true;
} else {
ret.push(v);
}
callback();
});
}, function(err) {
return callback(err, ret);
});
};

Scanner.prototype.close = function (callback) {
this.server.closeScanner(this.id, callback);
var self = this;
this.server.closeScanner(this.id, function(err) {
if (!err) self.closed = true;
callback.apply(null, arguments);
});
};


module.exports = Scanner;
2 changes: 2 additions & 0 deletions package.json
Expand Up @@ -19,6 +19,8 @@
}
},
"dependencies": {
"algorithmjs": "^1.0.0",
"async": "^2.6.0",
"debug": "~1.0.4",
"eventproxy": "~0.3.1",
"long": "~1.1.5",
Expand Down
164 changes: 163 additions & 1 deletion test/client_region.test.js
Expand Up @@ -13,6 +13,7 @@
* Module dependencies.
*/

var async = require('async');
var should = require('should');
var mm = require('mm');
var pedding = require('pedding');
Expand Down Expand Up @@ -76,7 +77,6 @@ describe('client_region.test.js', function () {
should.not.exist(err);
results.should.length(1);
var location = client.getCachedLocation(table, get.getRow());
// console.log('location: ', location.toString());
mm(client, 'locateRegion', function () {
var cb = arguments[arguments.length - 1];
cb(null, location);
Expand Down Expand Up @@ -226,4 +226,166 @@ describe('client_region.test.js', function () {
});
});
});

describe('scan through regions', function() {
var Scan = require('../lib/scan');

it('should through regions', function(done) {
var scan = new Scan();
var rows = [];

var count = function() {
rows.length.should.equal(49);
done();
};

client.getScanner(config.tableUser, scan, function(err, scanner) {
var next = function() {
scanner.next(function(err, row) {
should.ifError(err);

if (!row) {
return count();
}

rows.push(row);
next();
});
};

next();
});
});

it('should through regions with cache 10', function(done) {
var scan = new Scan();
scan.caching = 10;
var rows = [];

var count = function() {
rows.length.should.equal(49);
done();
};

client.getScanner(config.tableUser, scan, function(err, scanner) {
var next = function() {
scanner.next(3, function(err, ret) {
should.ifError(err);

if (!ret.length) {
return count();
}

rows = Array.prototype.concat.apply(rows, ret);
next();
});
};

next();
});
});

it('should cache even it\'s closed', function(done) {
var scan = new Scan();
scan.caching = 10;
var rows = [];

var count = function() {
rows.length.should.equal(10);
done();
};

client.getScanner(config.tableUser, scan, function(err, scanner) {
var next = function() {
scanner.next(function(err, row) {
should.ifError(err);

if (!rows.length) {
return scanner.close(function() {
rows.push(row);
next();
});
}

if (!row) {
return count();
}

rows.push(row);
next();
});
};

next();
});
});

it('should scan through regions with stop row', function(done) {
var scan = new Scan(null, '7fffffdd');
scan.caching = 10;
var rows = [];

var count = function() {
rows.length.should.equal(24);
done();
};

client.getScanner(config.tableUser, scan, function(err, scanner) {
var next = function() {
scanner.next(function(err, row) {
should.ifError(err);

if (!row) {
return count();
}

rows.push(row);
next();
});
};

next();
});
});

it('should scan error', function(done) {
var scan = new Scan(null, '7fffffdd');
scan.caching = 10;
var rows = [];

var count = function() {
rows.length.should.equal(10);
done();
};

var times = 0;
client.getScanner(config.tableUser, scan, function(err, scanner) {
var next = function() {
scanner.next(function(err, row) {
times++;
if (times < 11) {
should.ifError(err);
} else {
err.message.should.startWith(
'java.io.IOException: java.lang.NoSuchMethodException: ' +
'org.apache.hadoop.hbase.ipc.HRegionInterface.next(int, int)');
return count();
}

if (!rows.length) {
rows.push(row);
scanner.id = -1;
return next();
}

rows.push(row);
next();
});
};

next();
});
});

});
});

0 comments on commit 27886f2

Please sign in to comment.