Skip to content

Commit

Permalink
Add implementation-sniffing to .stream()
Browse files Browse the repository at this point in the history
  • Loading branch information
mikermcneil committed May 29, 2018
1 parent dce4c0f commit 7fd0fed
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"no-trailing-spaces": ["warn"],
"no-undef": ["error"],
"no-unexpected-multiline": ["warn"],
"no-unused-vars": ["warn", {"caughtErrors":"all", "caughtErrorsIgnorePattern": "^unused($|[A-Z].*$)"}],
"no-unused-vars": ["warn", {"caughtErrors":"all", "caughtErrorsIgnorePattern": "^unused($|[A-Z].*$)", "argsIgnorePattern": "^unused($|[A-Z].*$)", "varsIgnorePattern": "^unused($|[A-Z].*$)" }],
"no-use-before-define": ["error", {"functions":false}],
"one-var": ["warn", "never"],
"quotes": ["warn", "single", {"avoidEscape":false, "allowTemplateLiterals":true}],
Expand Down
49 changes: 39 additions & 10 deletions lib/waterline/methods/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var forgeStageTwoQuery = require('../utils/query/forge-stage-two-query');
var getQueryModifierMethods = require('../utils/query/get-query-modifier-methods');
var verifyModelMethodContext = require('../utils/query/verify-model-method-context');


/**
* Module constants
*/

var DEFERRED_METHODS = getQueryModifierMethods('stream');
var STRIP_COMMENTS_RX = /(\/\/.*$)|(\/\*[\s\S]*?\*\/)|(\s*=[^,\)]*(('(?:\\'|[^'\r\n])*')|("(?:\\"|[^"\r\n])*"))|(\s*=[^,\)]*))/mg;



Expand Down Expand Up @@ -341,6 +341,15 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?,
// once. If it's eachRecordFn, we'll call it once per record.
(function _makeCallOrCallsToAppropriateIteratee(proceed){

// Check if the iteratee declares a callback parameter
var seemsToExpectCallback = (function(){
var fn = query.eachBatchFn || query.eachRecordFn;
var fnStr = fn.toString().replace(STRIP_COMMENTS_RX, '');
var parametersAsString = fnStr.slice(fnStr.indexOf('(')+1, fnStr.indexOf(')'));
// console.log(':seemsToExpectCallback:',parametersAsString, !!parametersAsString.match(/\,\s*([^,\{\}\[\]\s]+)\s*$/));
return !! parametersAsString.match(/\,\s*([^,\{\}\[\]\s]+)\s*$/);
})();//†

// If an `eachBatchFn` iteratee was provided, we'll call it.
// > At this point we already know it's a function, because
// > we validated usage at the very beginning.
Expand All @@ -351,10 +360,9 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?,
// that occur after the first.
var didIterateeAlreadyHalt;
try {
// TODO: handle stream iteratees with no declared callback parameter
var promiseMaybe = query.eachBatchFn(batchOfRecords, function (err) {
if (err) { return proceed(err); }

if (!seemsToExpectCallback) { return proceed(new Error('Unexpected attempt to invoke callback. Since this per-batch iteratee function does not appear to expect a callback parameter, this stub callback was provided instead. Please either explicitly list the callback parameter among the arguments or change this code to no longer use a callback.')); }//•
if (err) { return proceed(err); }//•
if (didIterateeAlreadyHalt) {
console.warn(
'Warning: The per-batch iteratee provided to `.stream()` triggered its callback \n'+
Expand All @@ -363,15 +371,24 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?,
);
return;
}//-•

didIterateeAlreadyHalt = true;

return proceed();
});//_∏_ </ invoked per-batch iteratee >

// Take care of unhandled promise rejections from `await`.
// Take care of unhandled promise rejections from `await` (if appropriate)
if (query.eachBatchFn.constructor.name === 'AsyncFunction') {
if (!seemsToExpectCallback) {
promiseMaybe = promiseMaybe.then(function(){
didIterateeAlreadyHalt = true;
proceed();
});//_∏_
}//fi
promiseMaybe.catch(function(e){ proceed(e); });//_∏_
} else {
if (!seemsToExpectCallback) {
didIterateeAlreadyHalt = true;
return proceed();
}
}

} catch (e) { return proceed(e); }//>-•
Expand All @@ -391,8 +408,8 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?,
// that occur after the first.
var didIterateeAlreadyHalt;
try {
// TODO: handle stream iteratees with no declared callback parameter
var promiseMaybe = query.eachRecordFn(record, function (err) {
if (!seemsToExpectCallback) { return next(new Error('Unexpected attempt to invoke callback. Since this per-record iteratee function does not appear to expect a callback parameter, this stub callback was provided instead. Please either explicitly list the callback parameter among the arguments or change this code to no longer use a callback.')); }//•
if (err) { return next(err); }

if (didIterateeAlreadyHalt) {
Expand All @@ -410,10 +427,22 @@ module.exports = function stream( /* criteria?, eachRecordFn?, explicitCbMaybe?,

});//_∏_ </ invoked per-record iteratee >

// Take care of unhandled promise rejections from `await`.
// Take care of unhandled promise rejections from `await` (if appropriate)
if (query.eachRecordFn.constructor.name === 'AsyncFunction') {
if (!seemsToExpectCallback) {
promiseMaybe = promiseMaybe.then(function(){
didIterateeAlreadyHalt = true;
next();
});//_∏_
}//fi
promiseMaybe.catch(function(e){ next(e); });//_∏_
}
} else {
if (!seemsToExpectCallback) {
didIterateeAlreadyHalt = true;
return next();
}
}//fl

} catch (e) { return next(e); }

},// ~∞%°
Expand Down

0 comments on commit 7fd0fed

Please sign in to comment.