Skip to content

Perform regular expression replacements on streams as well as transcoding/teeing/concatenating them robustly.

Notifications You must be signed in to change notification settings

edfus/stream-editor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Stream-Editor

npm install size codecov CI Node.js Version

Features

Partial replacement

A partial replacement is replacing only the 1st parenthesized capture group substring match with replacement specified, allowing a simpler syntax and a minimum modification.

Take the following snippet converting something like import x from "../src/x.mjs" into import x from "../build/x.mjs" as an example:

import { sed as updateFileContent } from "stream-editor";

updateFileContent({
  file: "index.mjs",
  search: matchParentFolderImport(/(src\/(.+?))/),
  replacement: "build/$2",
  maxTimes: 2,
  required: true
});

function matchImport (addtionalPattern) {
  const parts = /import\s+.+\s+from\s*['"](.+?)['"];?/.source.split("(.+?)");

  return new RegExp([
    parts[0],
    addtionalPattern.source,
    parts[1]
  ].join(""));
}

Special replacement patterns (parenthesized capture group placeholders) are well supported in a partial replacement, either for function replacements or string replacements. And all other concepts are designed to keep firmly to their origins in vanilla String.prototype.replace method, though the $& (also the 1st supplied value to replace function) and $1 (the 2nd param passed) always have the same value, supplying the matched substring in 1st PCG.

You can specify a truthy isFullReplacement to perform a full replacment instead.

Asynchronous replacement

Yes, asynchronous function replacements just work like a charm.

import { streamEdit } from "stream-editor";

const filepath = "./index.js";
const dest = "./build/index.js";

streamEdit({
  from: fs.createReadStream(filepath),
  to: fs.createWriteStream(dest),
  replace: [
    {
      // match ${{ import("module.mjs") }} | ${{ expr( 1 + 1 ) }}
      match: /\$\{\{\s*([A-Z_-]+?)\s*\(\s*(.+?)\s*\)\s*\}\}/i,
      replacement: async (whole, method, input) => {
        switch (method.toUpperCase()) {
          case "IMPORT":
            input = input.replace(/^["']|["']$/g, "");
            const importFilePath = path.join(path.dirname(filepath), input);
            return fs.promises.readFile(importFilePath, "utf-8");
          case "EXPR":
            return (async () => String(await eval(input)))();
          default:
            throw new Error(`unknown method ${method} in ${whole}`);
        }
      }
    }
  ],
  defaultOptions: {
    isFullReplacement: true
  }
});

Substituting texts within files in streaming fashion

This package will create readable and writable streams connected to a single file at the same time, while disallowing any write operations to advance further than the current reading index. This feature is based on rw-stream's great work.

To accommodate RegEx replacement (which requires intact strings rather than chunks that may begin or end at any position) with streams, we brings separator (default: /(?<=\r?\n)/) and join (default: '') options into use. You should NOT specify separators that may divide text structures targeted by your RegEx searches, which would result in undefined behavior.

Moreover, as the RegEx replacement part in options is actually optional, stream-editor can also be used to break up streams and reassemble them like split2 does:

// named export sed is an alias for streamEdit
const { streamEdit } = require("stream-editor");

const filepath = join(__dirname, `./file.ndjson`);

/* replace CRLF with LF */
await streamEdit({
  file: filepath,
  separator: "\r\n",
  join: "\n"
});

/* parse ndjson */
await streamEdit({
  from: createReadStream(filepath),
  to: new Writable({
    objectMode: true,
    write(parsedObj, _enc, cb) {
      return (
        doSomething()
          .then(() => cb())
          .catch(cb)
      );
    }
  }),
  separator: "\n",
  readableObjectMode: true,
  postProcessing: part => JSON.parse(part),
  abortController: new AbortController() // ...
});

You can specify null as the separator to completely disable splitting.

Setting limits on Regular Expressions' maximum executed times

This is achieved by altering all replacement into replacement functions and adding layers of proxying on them.

const { sed: updateFiles } = require("stream-editor");

/**
 * add "use strict" plus a compatible line ending
 * to the beginning of every commonjs file.
 */

// maxTimes version
updateFiles({
  files: commonjsFiles,
  match: /^().*(\r?\n)/,
  replacement: `"use strict";$2`,
  maxTimes: 1
});

// limit version
updateFiles({
  files: commonjsFiles,
  replace: [
    {
      match: /^().*(\r?\n)/,
      replacement: `"use strict";$2`,
      /**
       * a local limit,
       * applying restriction on certain match's maximum executed times.
       */
      limit: 1 
    }
  ]
  // a global limit, limit the maximum count of every search's executed times.
  limit: 1 
});

Once the limit specified by option limit is reached, underlying transform stream will become a transparent passThrough stream if option truncate is falsy, otherwise the remaining part will be discarded. In contrast, maxTimes just performs a removal on that search.

Transcoding streams or files

streamEdit({
  from: createReadStream("gbk.txt"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

Option decodeBuffers is the specific character encoding, like utf-8, iso-8859-2, koi8, cp1261, gbk, etc for decoding the input raw buffer. Some encodings are only available for Node embedded the entire ICU but the good news is that full-icu has been made the default since v14+ (see nodejs/node#29522).

Note that option decodeBuffers only makes sense when no encoding is assigned and stream data are passed as buffers. Below are some wrong input examples:

streamEdit({
  from: 
    createReadStream("gbk.txt").setEncoding("utf8"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

streamEdit({
  from: 
    createReadStream("gbk.txt", "utf8"),
  to: createWriteStream("hex.txt"),
  decodeBuffers: "gbk",
  encoding: "hex"
});

Option encoding is for encoding all processed and joined strings to buffers with according encoding. Following options are supported by Node.js: ascii, utf8, utf-8, utf16le, ucs2, ucs-2, base64, latin1, binary, hex.

Piping/teeing/confluencing streams with proper error handling & propagation

Confluence:

const yamlFiles = await (
  fsp.readdir(folderpath, { withFileTypes: true })
      .then(dirents =>
        dirents
          .filter(dirent => dirent.isFile() && dirent.name.endsWith(".yaml"))
          .sort((a, b) => a.name.localeCompare(b.name))
          .map(({ name }) => createReadStream(join(folderpath, name)))
      )
);

streamEdit({
  from: yamlFiles,
  to: createWriteStream(resultPath),
  contentJoin: "\n\n" // join streams
  // the encoding of contentJoin respects the `encoding` option
});

Teeing:

streamEdit({
  readableStream: new Readable({
    read(size) {
      // ...
    }
  }),
  writableStreams: new Array(6).fill(0).map((_, i) =>
    createWriteStream(join(resultFolderPath, `./test-source${i}`))
  )
});

You can have a look at tests regarding error handling here.

No dependency

stream-editor previously depends on rw-stream, but for some historical reasons, I refactored rw-stream and bundled it as a part of this package. See src/rw-stream.

Currently, stream-editor has zero dependency.

High coverage tests

See https://github.com/edfus/stream-editor/tree/master/test.


  Normalize & Replace
    √ can handle sticky regular expressions
    √ can handle string match with special characters
    √ can handle partial replacement with placeholders
    √ can handle non-capture-group parenthesized pattern: Assertions
    √ can handle non-capture-group parenthesized pattern: Round brackets
    √ can handle pattern starts with a capture group
    √ can handle malformed (without capture groups) partial replacement
    √ can await replace partially with function
    √ recognize $\d{1,3} $& $` $' and check validity (throw warnings)
    √ produce the same result as String.prototype.replace

  Edit streams
    √ should check arguments
    √ should warn unknown/unneeded options
    √ should respect FORCE_COLOR, NO_COLOR, NODE_DISABLE_COLORS
    √ should pipe one Readable to multiple dumps (54ms)
    √ should replace CRLF with LF
    √ should have replaced /dum(b)/i to dumpling (while preserving dum's case)
    √ should have global and local limits on replacement amount
    √ should have line buffer maxLength
    √ should edit and combine multiple Readable into one Writable
    √ has readableObjectMode
    √ can handle async replacements
    √ can signal an unsuccessful substitution using beforeCompletion
    √ can declare a limit below which a substitution is considered failed for a search
    cancelation
      √ should check validity
      √ can abort a substitution before it has completed.
      √ can handle already aborted controller
      √ doesn't have memory leaks (is using WeakRef)
    truncation & limitation
      √ truncating the rest when limitations reached
      √ not: self rw-stream
      √ not: piping stream
    transcoding
      √ gbk to utf8 buffer
      √ gbk to hex with HWM
    error handling
      √ destroys streams properly when one of them closed prematurely
      √ destroys streams properly if errors occurred during initialization
      √ multiple-to-one: can correctly propagate errors emitted by readableStreams
      √ multiple-to-one: can handle prematurely destroyed readableStreams
      √ multiple-to-one: can correctly propagate errors emitted by writableStream
      √ multiple-to-one: can handle prematurely ended writableStream
      √ multiple-to-one: can handle prematurely destroyed writableStream
      √ one-to-multiple: can correctly propagate errors emitted by writableStreams
      √ one-to-multiple: can handle prematurely ended writableStreams
      √ one-to-multiple: can handle prematurely destroyed writableStreams
      √ can handle errors thrown from postProcessing
      √ can handle errors thrown from join functions
      √ can handle errors thrown from replacement functions
    corner cases
      √ can handle empty content
      √ can handle regular expressions that always match
      √ can handle non-string in a regExp separator's split result
    try-on
      √ can handle files larger than 64KiB


  49 passing (275ms)

API

Overview

This package has two named function exports: streamEdit and sed (an alias for streamEdit).

streamEdit returns a promise that resolves to void | void[] for files, a promise that resolves to Writable[] | Writable for streams (which keeps output streams' references).

An object input with one or more following options is acceptable to streamEdit:

Options for replacement

name alias expect safe to ignore default
search match string | RegExp none
replacement x string | [async] (wholeMatch, ...args) => string none
limit x number Infinity
maxTimes x number Infinity
minTimes x number 0
required x boolean false
isFullReplacement x boolean false
disablePlaceholders x boolean false
replace x an Array of { search, replacement } none
defaultOptions x BasicReplaceOptions {}
join x string | (part: string) => string | null part => part
postProcessing x (part: string, isLastPart: boolean) => any none
beforeCompletion x `() => promise void`
type GlobalLimit = number;
type LocalLimit = number;

interface BasicReplaceOptions {
  /**
   * Perform a full replacement or not.
   * 
   * A RegExp search without capture groups or a search in string will be
   * treated as a full replacement silently.
   */
  isFullReplacement?: Boolean;
  /**
   * Only valid for a string replacement.
   * 
   * Disable placeholders in replacement or not. Processed result shall be
   * exactly the same as the string replacement if set to true.
   * 
   * Default: false
   */
  disablePlaceholders?: Boolean;
  /**
   * Apply restriction on certain search's maximum executed times.
   * 
   * Upon reaching the limit, if option `truncate` is falsy (false by default),
   * underlying transform stream will become a transparent passThrough stream.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  limit?: LocalLimit;
  /**
   * Observe a certain search's executed times, remove that search right
   * after upper limit reached.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  maxTimes?: number;
  /**
   * For the search you specified, add a limit below which the substitution
   * is considered failed.
   */
  minTimes?: number;
  /**
   * Sugar for minTimes = 1
   */
  required?: boolean;
}

interface SearchAndReplaceOptions extends BasicReplaceOptions {
    /**
   * Correspondence: `String.prototype.replaceAll`'s 1st argument.
   * 
   * Accepts a literal string or a RegExp object.
   * 
   * Will replace all occurrences by converting input into a global RegExp
   * object, which means that the according replacement might be invoked 
   * multiple times for each full match to be replaced.
   * 
   * Every `search` and `replacement` not arranged in pairs is silently
   * discarded in `options`, while in `options.replace` that will result in
   * an error thrown.
   */
  search?: string | RegExp;

  /**
   * Correspondence: String.prototype.replace's 2nd argument.
   * 
   * Replaces the according text for a given match, a string or
   * a function that returns the replacement text can be passed.
   * 
   * Special replacement patterns (parenthesized capture group placeholders)
   *  / async replacement functions are well supported.
   * 
   * For a partial replacement, $& (also the 1st supplied value to replace
   * function) and $1 (the 2nd param passed) always have the same value,
   * supplying the matched substring in the parenthesized capture group
   * you specified.
   */
  replacement?: string | ((wholeMatch: string, ...args: string[]) => string);
}

interface MultipleReplacementOptions {
  /**
   * Apply restriction on the maximum count of every search's executed times.
   * 
   * Upon reaching the limit, if option `truncate` is falsy (false by default),
   * underlying transform stream will become a transparent passThrough stream.
   * 
   * Default: Infinity. 0 is considered as Infinity for this option.
   */
  limit?: GlobalLimit;
  /**
   * Should be an array of { [ "match" | "search" ], "replacement" } pairs.
   * 
   * Possible `search|match` and `replacement` pair in `options` scope will be
   * prepended to `options.replace` array, if both exist.
   */
  replace?: Array<SearchAndReplaceOptions | MatchAndReplaceOptions>;
  /**
   * Default: {}
   */
  defaultOptions?: BasicReplaceOptions;
}

type ReplaceOptions = MultipleReplacementOptions `OR` SearchAndReplaceOptions;

interface BasicOptions extends ReplaceOptions {
  /**
   * Correspondence: String.prototype.join's 1nd argument, though a function 
   * is also acceptable.
   * 
   * You can specify a literal string or a function that returns the post-processed
   * part.
   * 
   * Example function for appending a CRLF: part => part.concat("\r\n");
   * 
   * Default: part => part
   */
  join?: string | ((part: string) => string) | null;
  /**
   * A post-processing function that consumes transformed strings and returns a
   * string or a Buffer. This option has higher priority over option `join`.
   * 
   * If readableObjectMode is enabled, any object accepted by Node.js objectMode
   * streams can be returned.
   */
  postProcessing?: (part: string, isLastPart: boolean) => any;
  /**
   * This optional function will be called before the destination(s) close,
   * delaying the resolution of the promise returned by streamEdit() until
   * beforeCompletion resolves.
   * 
   * You can also return a rejected promise or simply raise an error to signal a
   * failure and destroy all streams.
   */
  beforeCompletion?: () => Promise<void> | void;
}

Options for stream transform

name alias expect safe to ignore default
separator x string | RegExp | null /(?<=\r?\n)/
encoding x string | null null
decodeBuffers x string "utf8"
truncate x boolean false
maxLength x number Infinity
readableObjectMode x boolean false
abortController x AbortController null

Options that are only available under certain context:

name alias expect context default
readStart x number file[s] 0
writeStart x number file[s] 0
contentJoin x string | Buffer readableStreams ""
interface BasicOptions extends ReplaceOptions {
  /**
   * Correspondence: String.prototype.split's 1nd argument.
   * 
   * Accepts a literal string or a RegExp object.
   * 
   * Used by underlying transform stream to split upstream data into separate
   * to-be-processed parts.
   * 
   * String.prototype.split will implicitly call `toString` on non-string &
   * non-regex & non-void values.
   * 
   * Specify `null` or `undefined` to process upstream data as a whole.
   * 
   * Default: /(?<=\r?\n)/. Line endings following lines.
   */
  separator?: string | RegExp | null;
  /**
   * Correspondence: encoding of Node.js Buffer.
   * 
   * If specified, then processed and joined strings will be encoded to buffers
   * with that encoding.
   *
   * Node.js currently supportes following options:
   * "ascii" | "utf8" | "utf-8" | "utf16le" | "ucs2" | "ucs-2" | "base64" | "latin1" | "binary" | "hex"
   * Default: null.
   */
  encoding?: BufferEncoding | null;
  /**
   * Correspondence: encodings of WHATWG Encoding Standard TextDecoder.
   * 
   * Accept a specific character encoding, like utf-8, iso-8859-2, koi8, cp1261,
   * gbk, etc for decoding the input raw buffer.
   * 
   * This option only makes sense when no encoding is assigned and stream data are 
   * passed as Buffer objects (that is, haven't done something like
   * readable.setEncoding('utf8'));
   * 
   * Example: streamEdit({
   *    from: createReadStream("gbk.txt"),
   *    to: createWriteStream("utf8.txt"),
   *    decodeBuffers: "gbk"
   * });
   * 
   * Some encodings are only available for Node embedded the entire ICU (full-icu).
   * See https://nodejs.org/api/util.html#util_class_util_textdecoder.
   * 
   * Default: "utf8".
   */
  decodeBuffers?: string;
  /**
   * Truncating the rest or not when limits reached.
   * 
   * Default: false.
   */
  truncate?: Boolean;
  /**
   * The maximum size of the line buffer.
   * 
   * A line buffer is the buffer used for buffering the last incomplete substring
   * when dividing chunks (typically 64 KiB) by options.separator.
   * 
   * Default: Infinity.
   */
  maxLength?: number;
  /**
   * Correspondence: readableObjectMode option of Node.js stream.Transform
   * 
   * Options writableObjectMode and objectMode are not supported.
   * 
   * Default: Infinity.
   */
  readableObjectMode?: boolean;
  /**
   * An optional controller object that allows you to abort one or more
   * substitutions as and when desired.
   * 
   * Node version >= 15.0.0 is required.
   */
  abortController?: AbortController;
}

interface UpdateFileOptions extends BasicOptions {
  file: string;
  readStart?: number;
  writeStart?: number;
}

interface UpdateFilesOptions extends BasicOptions {
  files: string[];
  readStart?: number;
  writeStart?: number;
}

interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
  from: Array<Readable>;
  to: T;
  /**
   * Concatenate results of transformed Readables with the input value.
   * Accepts a literal string or a Buffer.
   * option.encoding will be passed along with contentJoin to Writable.write
   * Default: ""
   */
  contentJoin: string | Buffer;
}

interface MultipleReadablesToWritableOptionsAlias<T> extends BasicOptions {
  readableStreams: Array<Readable>;
  writableStream: T;
  contentJoin: string | Buffer;
}

Options for stream input/output

name alias expect with default
file x string self none
files x an Array of string self none
readableStream from Readable writableStream[s] none
writableStream to Writable readableStream[s] none
readableStreams from an Array of Readable writableStream none
writableStreams to an Array of Writable readableStream none

file:

interface UpdateFileOptions extends BasicOptions {
  file: string;
  readStart?: number;
  writeStart?: number;
}
function streamEdit(options: UpdateFileOptions): Promise<void>;

files:

interface UpdateFilesOptions extends BasicOptions {
  files: string[];
  readStart?: number;
  writeStart?: number;
}

function streamEdit(options: UpdateFilesOptions): Promise<void[]>;

transform Readable:

interface TransformReadableOptions<T> extends BasicOptions {
  [ from | readableStream ]: Readable;
  [ to   | writableStream ]: T;
}

function streamEdit<T extends Writable>(
  options: TransformReadableOptions<T>
): Promise<T>;

readables -> writable:

interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
  [ from | readableStreams ]: Array<Readable>;
  [ to   | writableStream  ]: T;
  contentJoin: string | Buffer;
}

function streamEdit<T extends Writable>(
  options: MultipleReadablesToWritableOptions<T>
): Promise< T >;

readable -> writables

interface ReadableToMultipleWritablesOptions<T> extends BasicOptions {
  [ from | readableStream  ]: Readable;
  [ to   | writableStreams ]: Array<T>;
}

function streamEdit<T extends Writable>(
  options: ReadableToMultipleWritablesOptions<T>
): Promise< T[]>;

For further reading, take a look at the declaration file.

Examples

See ./examples and esm2cjs

About

Perform regular expression replacements on streams as well as transcoding/teeing/concatenating them robustly.

Resources

Stars

Watchers

Forks