diff --git a/src/main.js b/src/main.js index 2e00164..056bb80 100644 --- a/src/main.js +++ b/src/main.js @@ -1,3 +1,3 @@ export { exec } from './exec.js' export { task } from './task.js' -export { stream } from './stream.js' +export { stream } from './stream/main.js' diff --git a/src/stream.js b/src/stream.js deleted file mode 100644 index 4eb7a47..0000000 --- a/src/stream.js +++ /dev/null @@ -1,113 +0,0 @@ -import { callbackify } from 'util' - -import through from 'through2-concurrent' - -import { isValidInput } from './input.js' -import { parseOpts } from './options.js' -import { execCommand, streamCommand } from './exec.js' - -// Creates a stream to use in Gulp e.g. -// src(...).pipe(stream(({ path }) => ['command', [path]])) -// This should not be used with commands that allow several files as arguments -// (through variadic arguments, globbing or directory recursion) as a single -// call to those functions would be more efficient that creating lots of -// child processes through streaming. -export const stream = function(mapFunc, opts) { - const defaultOpts = getDefaultOpts({ opts }) - const { maxConcurrency, result: resultOpt, ...optsA } = parseOpts({ - opts, - defaultOpts, - forcedOpts, - }) - - return through.obj( - { maxConcurrency }, - execVinyl.bind(null, { mapFunc, opts: optsA, resultOpt }), - ) -} - -const getDefaultOpts = function({ opts: { result = 'replace' } = {} }) { - return { - // This is too verbose if done on each iteration - verbose: false, - // We use `through2-concurrent` because `through2` processes files serially - // The default is 16 which is too low - maxConcurrency: 100, - // What to do with the result. Either 'save' or 'replace' - result: 'replace', - // With `result: 'replace'` which stream to use: `stdout`, `stderr` or `all` - from: 'stdout', - ...resultDefaultOpts[result], - } -} - -const resultDefaultOpts = { - // `save` should retrieve output as string, but this is not needed for - // `replace`. Same thing with final newline stripping. - replace: { encoding: 'buffer', stripFinalNewline: false }, -} - -const forcedOpts = { - // Forces piping stdout|stderr because: - // - `inherit` would be too verbose if done on each iteration. - // - `save` mode would not get `stdout|stderr|all` properties. - // - `replace` mode would not work. - stdout: 'pipe', - stderr: 'pipe', - // `stdio` cannot be combined with `stdout|stderr` with execa - stdio: undefined, -} - -const cExecVinyl = async function({ mapFunc, opts, resultOpt }, file) { - const input = await mapFunc(file) - - // Returning `undefined` or invalid command skips it silently. - // `file.execa` array will be pushed with `undefined`. - if (isValidInput({ input })) { - await handleResult[resultOpt]({ file, input, opts }) - } - - return file -} - -const execVinyl = callbackify(cExecVinyl) - -const saveResult = async function({ file, file: { execa = [] }, input, opts }) { - const result = await execCommand(input, opts) - // eslint-disable-next-line no-param-reassign, fp/no-mutation - file.execa = [...execa, result] -} - -// If the `file` already uses streams, we do it as well as it's more efficient. -// This is done usually by using `gulp.src(..., { buffer: false })` -// Otherwise we don't since many Gulp plugins don't support `file.contents` -// being a stream. -const replaceResult = function({ file, input, opts }) { - if (file.isStream()) { - return streamResult({ file, input, opts }) - } - - return overwriteResult({ file, input, opts }) -} - -const streamResult = function({ file, input, opts, opts: { from } }) { - const execaResult = streamCommand(input, opts) - const { [from]: result } = execaResult - - // Make stream fail if the command fails - execaResult.catch(error => result.emit('error', error)) - - // eslint-disable-next-line no-param-reassign, fp/no-mutation - file.contents = result -} - -const overwriteResult = async function({ file, input, opts, opts: { from } }) { - const { [from]: result } = await execCommand(input, opts) - // eslint-disable-next-line no-param-reassign, fp/no-mutation - file.contents = result -} - -const handleResult = { - save: saveResult, - replace: replaceResult, -} diff --git a/src/stream/main.js b/src/stream/main.js new file mode 100644 index 0000000..b436179 --- /dev/null +++ b/src/stream/main.js @@ -0,0 +1,38 @@ +import { callbackify } from 'util' + +import through from 'through2-concurrent' + +import { parseOpts } from '../options.js' + +import { getDefaultOpts, forcedOpts } from './options.js' +import { handleResult } from './result.js' + +// Creates a stream to use in Gulp e.g. +// src(...).pipe(stream(({ path }) => ['command', [path]])) +// This should not be used with commands that allow several files as arguments +// (through variadic arguments, globbing or directory recursion) as a single +// call to those functions would be more efficient that creating lots of +// child processes through streaming. +export const stream = function(mapFunc, opts) { + const defaultOpts = getDefaultOpts({ opts }) + const { maxConcurrency, result: resultOpt, ...optsA } = parseOpts({ + opts, + defaultOpts, + forcedOpts, + }) + + return through.obj( + { maxConcurrency }, + execVinyl.bind(null, { mapFunc, opts: optsA, resultOpt }), + ) +} + +const cExecVinyl = async function({ mapFunc, opts, resultOpt }, file) { + const input = await mapFunc(file) + + await handleResult({ file, input, opts, resultOpt }) + + return file +} + +const execVinyl = callbackify(cExecVinyl) diff --git a/src/stream/options.js b/src/stream/options.js new file mode 100644 index 0000000..38d0f35 --- /dev/null +++ b/src/stream/options.js @@ -0,0 +1,31 @@ +export const getDefaultOpts = function({ opts: { result = 'replace' } = {} }) { + return { + // This is too verbose if done on each iteration + verbose: false, + // We use `through2-concurrent` because `through2` processes files serially + // The default is 16 which is too low + maxConcurrency: 100, + // What to do with the result. Either 'save' or 'replace' + result: 'replace', + // With `result: 'replace'` which stream to use: `stdout`, `stderr` or `all` + from: 'stdout', + ...resultDefaultOpts[result], + } +} + +const resultDefaultOpts = { + // `save` should retrieve output as string, but this is not needed for + // `replace`. Same thing with final newline stripping. + replace: { encoding: 'buffer', stripFinalNewline: false }, +} + +export const forcedOpts = { + // Forces piping stdout|stderr because: + // - `inherit` would be too verbose if done on each iteration. + // - `save` mode would not get `stdout|stderr|all` properties. + // - `replace` mode would not work. + stdout: 'pipe', + stderr: 'pipe', + // `stdio` cannot be combined with `stdout|stderr` with execa + stdio: undefined, +} diff --git a/src/stream/result.js b/src/stream/result.js new file mode 100644 index 0000000..4739eda --- /dev/null +++ b/src/stream/result.js @@ -0,0 +1,50 @@ +import { isValidInput } from '../input.js' +import { execCommand, streamCommand } from '../exec.js' + +// Decides what to do with the child process result, either: +// - `save`: pushed to `file.execa` +// - `replace`: overwrite file's content +export const handleResult = function({ file, input, opts, resultOpt }) { + // Returning `undefined` or invalid command skips it silently. + // `file.execa` array will be pushed with `undefined`. + if (!isValidInput({ input })) { + return + } + + if (resultOpt === 'save') { + return saveResult({ file, input, opts }) + } + + // If the `file` already uses streams, we do it as well as it's more + // efficient. This is done usually by using `gulp.src(..., { buffer: false })` + // Otherwise we don't since many Gulp plugins don't support `file.contents` + // being a stream. + if (file.isStream()) { + return streamResult({ file, input, opts }) + } + + return bufferResult({ file, input, opts }) +} + +const saveResult = async function({ file, file: { execa = [] }, input, opts }) { + const result = await execCommand(input, opts) + // eslint-disable-next-line no-param-reassign, fp/no-mutation + file.execa = [...execa, result] +} + +const streamResult = function({ file, input, opts, opts: { from } }) { + const execaResult = streamCommand(input, opts) + const { [from]: result } = execaResult + + // Make stream fail if the command fails + execaResult.catch(error => result.emit('error', error)) + + // eslint-disable-next-line no-param-reassign, fp/no-mutation + file.contents = result +} + +const bufferResult = async function({ file, input, opts, opts: { from } }) { + const { [from]: result } = await execCommand(input, opts) + // eslint-disable-next-line no-param-reassign, fp/no-mutation + file.contents = result +}