From d7f7f7717706a5fb170f775247fbee0945a3da1b Mon Sep 17 00:00:00 2001 From: Brandon Goode Date: Wed, 31 May 2017 17:22:12 -0400 Subject: [PATCH] Implement scan.parallel. Closes #90 --- docs/_docs/scan.md | 10 ++++++ lib/Scan.js | 86 ++++++++++++++++++++++++++++++++++++++++------ test/Scan.js | 45 ++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 10 deletions(-) diff --git a/docs/_docs/scan.md b/docs/_docs/scan.md index 0f383e18b..4e27e94e6 100644 --- a/docs/_docs/scan.md +++ b/docs/_docs/scan.md @@ -44,6 +44,16 @@ Recursively scan as long as lastKey exists. This function will also return a pro `max` is the maximum number of recursive scans. Default: 0 - unlimited +### scan.parallel(totalSegments) + +Preforms a parallel scan on the table. + +`totalSegments` is the number of parallel scans + +The results will be merged into a single array. `.lastKey` will be an array of `lastKey` objects. + +**Warning** this can consume a lot of capacity. + ### scan.and() For readability only. Scans us AND logic for multiple attributes. `and()` does not provide any functionality and can be omitted. diff --git a/lib/Scan.js b/lib/Scan.js index d6399da1c..5fa8ba692 100644 --- a/lib/Scan.js +++ b/lib/Scan.js @@ -1,5 +1,6 @@ 'use strict'; var Q = require('q'); +var _ = require('lodash'); var debug = require('debug')('dynamoose:scan'); var errors = require('./errors'); @@ -86,10 +87,17 @@ Scan.prototype.exec = function (next) { scanReq.Limit = options.limit; } - if(options.ExclusiveStartKey) { - scanReq.ExclusiveStartKey = options.ExclusiveStartKey; + if(options.parallel) { + scanReq.TotalSegments = options.parallel; } + if(Array.isArray(options.ExclusiveStartKey)) { + scanReq.TotalSegments = options.ExclusiveStartKey.length; + } else if(options.ExclusiveStartKey) { + options.ExclusiveStartKey = [options.ExclusiveStartKey]; + } + + if(options.conditionalOperator) { scanReq.ConditionalOperator = options.conditionalOperator; } @@ -98,17 +106,28 @@ Scan.prototype.exec = function (next) { scanReq.ConsistentRead = true; } - function scan () { + function scanSegment (segment) { var deferred = Q.defer(); - + var scanOneReq = _.clone(scanReq); + + if(scanOneReq.TotalSegments) { + scanOneReq.Segment = segment; + } + + if(options.ExclusiveStartKey) { + scanOneReq.ExclusiveStartKey = options.ExclusiveStartKey[segment]; + } + + debug('adding scan segement', scanOneReq); + var models = {}, totalCount = 0, scannedCount = 0, timesScanned = 0, lastKey; if (!options.all) { options.all = {'delay': 0, 'max': 1}; } scanOne(); function scanOne() { - debug('scan request', scanReq); - Model.$__.base.ddb().scan(scanReq, function(err, data) { + debug('scan request', scanOneReq); + Model.$__.base.ddb().scan(scanOneReq, function(err, data) { if(err) { debug('Error returned by scan', err); return deferred.reject(err); @@ -143,7 +162,7 @@ Scan.prototype.exec = function (next) { } else { models = models.concat(data.Items.map(toModel)); } - + if(options.one) { if (!models || models.length === 0) { return deferred.resolve(); @@ -155,10 +174,10 @@ Scan.prototype.exec = function (next) { totalCount += data.Count; scannedCount += data.ScannedCount; timesScanned++; - + if ((options.all.max === 0 || timesScanned < options.all.max) && lastKey) { // scan.all need to scan again - scanReq.ExclusiveStartKey = lastKey; + scanOneReq.ExclusiveStartKey = lastKey; setTimeout(scanOne, options.all.delay * 1000); } else { @@ -175,10 +194,52 @@ Scan.prototype.exec = function (next) { }); } - return deferred.promise.nodeify(next); + return deferred.promise; } + function scan () { + var deferred = Q.defer(); + + var totalSegments = scanReq.TotalSegments || 1; + var scans = []; + for(var segment = 0; segment < totalSegments; segment++) { + scans.push(scanSegment(segment)); + } + Q.all(scans) + .then(function (results) { + var models = _.flatten(results); + var lastKeys = results.map(function (r) { + return r.lastKey; + }); + + if(lastKeys.length === 1) { + models.lastKey = lastKeys[0]; + } else if (_.compact(lastKeys).length !== 0){ + models.lastKey = lastKeys; + } + + + models.count = results.reduce(function(acc, r) { + return acc + r.count; + }, 0); + models.scannedCount = results.reduce(function(acc, r) { + return acc + r.scannedCount; + }, 0); + models.timesScanned = results.reduce(function(acc, r) { + return acc + r.timesScanned; + }, 0); + deferred.resolve(models); + + }) + .fail(function (err) { + deferred.reject(err); + }); + + return deferred.promise.nodeify(next); + + } + if(Model$.options.waitForActive) { return Model$.table.waitForActive().then(scan); } @@ -436,4 +497,9 @@ Scan.prototype.all = function (delay, max) { return this; }; +Scan.prototype.parallel = function (numberOfSegments) { + this.options.parallel = numberOfSegments; + return this; +}; + module.exports = Scan; diff --git a/test/Scan.js b/test/Scan.js index 73559cfa9..7fa97667a 100644 --- a/test/Scan.js +++ b/test/Scan.js @@ -667,6 +667,51 @@ describe('Scan', function (){ }); }); + it('Scan parallel', function (done) { + var Dog = dynamoose.model('Dog'); + + Dog.scan().parallel(2).exec(function (err, dogs) { + should.not.exist(err); + dogs.length.should.eql(20); + done(); + }); + }); + + + it('Scan with startAt array - implied parallel', function (done) { + var Dog = dynamoose.model('Dog'); + + Dog.scan().parallel(2).limit(2).exec() + .then(function (dogs) { + dogs.length.should.eql(4); + dogs.lastKey.length.should.eql(2); + dogs.count.should.eql(4); + dogs.scannedCount.should.eql(4); + dogs.timesScanned.should.eql(2); + return Dog.scan().startAt(dogs.lastKey).exec(); + }) + .then(function (more) { + more.length.should.eql(16); + more.count.should.eql(16); + more.scannedCount.should.eql(16); + more.timesScanned.should.eql(2); + done(); + }) + .catch(done); + }); + + it('Scan parallel all', function (done) { + var Dog = dynamoose.model('Dog'); + + Dog.scan().parallel(2).limit(2).all().exec() + .then(function (dogs) { + dogs.length.should.eql(20); + should.not.exist(dogs.lastKey); + done(); + }) + .catch(done); + }); +