A WHATWG Transformer / Node stream.Transform for replacing content.
A toolkit for stream content replacement.
Replace patterns in streaming data without needless buffering, or downstream delay.
- Processes streams incrementally - Transform gigabytes of data with constant memory usage
- Never splits matches - Correctly handles patterns that span chunk boundaries
- Supports async or generated replacements - Fetch dynamic content from APIs or databases and interpolate into streams
- Works everywhere - Native support for both WHATWG Streams (browsers, Deno, Bun, Edge runtimes, or Node) and Node.js streams
Perfect for server-side rendering, edge composition, log processing, template engines, content proxies, and any scenario where you need to transform text data as it flows through your application.
- πͺΆ Low dependency - Lightweight and minimal 1
- π Streaming-first - Processes data as it arrives, yielding as early as possible
- π― Boundary-aware - Correctly handles tokens split across chunk boundaries
- π Multiple replacements - Supports replacing multiple occurrences
- π¨ Dynamic content - Replace with strings, functions, or iterables, sync or async
- βΉοΈ Cancellable - Replacement can be halted mid-chunk
- β»οΈ Generator based - Consuming stream has control
- β‘ Minimal setup overhead - Stateless & re-usable search strategies
- π Composable - Pluggable search strategies & stream processors
- π¦ TypeScript - Full type definitions included
npm install replace-content-transformerSee Design on composable parts to import and combine.
Constructors are available from the /web import path, for both synchronous and asynchronous replacement use-cases:
import {
ReplaceContentTransformer,
AsyncReplaceContentTransformer
} from "replace-content-transformer/web";The constructors expect a "stream processor" and optional AbortSignal as arguments:
import type { SyncProcessor, AsyncProcessor } from "replace-content-transformer";
const syncTransformer = new ReplaceContentTransformer(
processor: SyncProcessor, stopReplacingSignal?: AbortSignal
);
const asyncTransformer = new AsyncReplaceContentTransformer(
processor: AsyncProcessor, stopReplacingSignal?: AbortSignal
);The SyncProcessor and AsyncProcessors available are described in Replacement Processors.
These processors take searchStrategy (see Search Strategies) and replacement constructor options.
The transformer acts on decoded text streams, and should be plugged into a stream pipeline appropriately. e.g.
const replacedStream = readableStream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TransformStream(transformer))
.pipeThrough(new TextEncoderStream());import {
StaticReplacementProcessor,
searchStrategyFactory
} from "replace-content-transformer";
import { ReplaceContentTransformer } from "replace-content-transformer/web";
// {{needle}} replaced by "12345"
const transformer = new ReplaceContentTransformer(
new StaticReplacementProcessor({
searchStrategy: searchStrategyFactory("{{needle}}"),
replacement: "12345"
})
);// {{anything between braces}} replaced by "54321"
const transformer = new ReplaceContentTransformer(
new StaticReplacementProcessor({
searchStrategy: searchStrategyFactory(["{{", "}}"]),
replacement: "54321"
})
);Use a function for dynamic replacement, perhaps based on the original content:
import { FunctionReplacementProcessor } from "replace-content-transformer";
// "{{this}} and {{that}}" becomes "this was match 0 and that was match 1"
const transformer = new ReplaceContentTransformer(
new FunctionReplacementProcessor({
searchStrategy: searchStrategyFactory(["{{", "}}"]),
replacement: (match: string, index: number) =>
`${match.slice(2, -2)} was match ${index}`
})
);Note
The regex search strategy is marginally less performant than static string anchors, and does not support all regular expression features. See limitations.
// `class="anything old-button"` becomes `class="anything new-button"`
// `class="old-button something else"` becomes `class="new-button something else"`
// `class="cold-button"` remains `class="cold-button"`
const transformer = new ReplaceContentTransformer(
new FunctionReplacementProcessor({
searchStrategy: searchStrategyFactory(
/class="(?<before>[^"]*?\b)old-button(?<after>\b[^"]*?)"/
),
replacement: (match: RegExpExecArray) => {
const { before, after } = match.groups;
return `class="${before}new-button${after}"`;
}
})
);Replace with asynchronous content. Ensures each async replacement completes before the next starts.
import { AsyncFunctionReplacementProcessor } from "replace-content-transformer";
import fs from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
// `<img src="file://image.png">` becomes "<img src="data:image/png;base64,...>"
const transformer = new AsyncReplaceContentTransformer(
new AsyncFunctionReplacementProcessor({
searchStrategy: searchStrategyFactory(["<img", 'src="file://', '.png">']),
replacement: async (imgTag: string) =>
`<img src="data:image/png;base64,${(
await fs.readFile(
path.join(
path.dirname(fileURLToPath(import.meta.url)),
imgTag.match(/\/\/(.+?)"/)[1]
)
)
).toString("base64")}">`
})
);Can alternatively use the non-async FunctionReplacementProcessor to process Promise responses, due to the WHATWG Streams API's native support for enqueueing any JavaScript value, including promises, which will be awaited by downstream consumers.
Warning
This subverts back-pressure control, and may conflict with a desired highWaterMark; the replacement function can't slow down production based on consumer speed. However, it allows for early discovery in the input stream.
// `<link href="https://example.com/css" rel="stylesheet" />` becomes `<style>{content of sheet}</style>`
const transformer = new ReplaceContentTransformer<Promise<string>>(
new FunctionReplacementProcessor<Promise<string>>({
searchStrategy: searchStrategyFactory([
"<link",
'href="',
'.css"',
'rel="stylesheet"',
"/>"
]),
replacement: async (match: string): Promise<string> => {
const {
groups: { url }
} = /href="(?<url>[^"]+)"/.exec(match)!;
const res = await fetch(url);
return `<style>${await res.text()}</style>`;
}
})
);Note
If promise-concurrency needs control, consider a replacement function that limits in-flight promises via pooling:
const maxConcurrent = 5;
const active = new Set<Promise<string>>();
const replacement = async (match: string): Promise<string> => {
if (active.size >= maxConcurrent) {
await Promise.race(active);
}
const [, url] = /href="([^"]+)"/.exec(match)!;
const promise = fetch(url).then((response) => {
active.delete(promise);
return response.text();
});
active.add(promise);
return `<style>${await promise}</style>`;
};Interpolate a sequence into the stream:
import { IterableFunctionReplacementProcessor } from "replace-content-transformer";
// "1 2 3 4 5" becomes "1 2 3.1 3.2 3.3 4 5"
const transformer = new ReplaceContentTransformer(
new IterableFunctionReplacementProcessor({
searchStrategy: searchStrategyFactory("3 "),
replacement: () => [...Array(3)].map((_, i) => `3.${i + 1} `)
})
);Interpolate ReadableStreams, or other async iterables, into the output. Ensures each async operation completes before the next starts:
import { AsyncIterableFunctionReplacementProcessor } from "replace-content-transformer";
// `<div><esi:include src="https://example.com/foo" /></div>` fills the `<div>` with content fetched from https://example.com/foo
const transformer = new AsyncReplaceContentTransformer(
new AsyncIterableFunctionReplacementProcessor({
searchStrategy: searchStrategyFactory(["<esi:include", "/>"]),
replacement: async (match: string) => {
const {
groups: { url }
} = /src="(?<url>[^"]+)"/.exec(match)!;
const res = await fetch(url);
return res.body!.pipeThrough(new TextDecoderStream());
}
})
);Recursive replacement, with controlled depth:
const searchStrategy = searchStrategyFactory(["<esi:include", "/>"]);
const maxDepth = 3;
function transformerFactory(currentDepth: number) {
return new AsyncReplaceContentTransformer(
new AsyncIterableFunctionReplacementProcessor({
searchStrategy,
replacement: async (match: string) => {
const {
groups: { url }
} = /src="(?<url>[^"]+)"/.exec(match)!;
const res = await fetch(url);
const bodyStream = res.body!.pipeThrough(new TextDecoderStream());
return currentDepth < maxDepth
? bodyStream.pipeThrough(
new TransformStream(transformerFactory(currentDepth + 1))
)
: bodyStream;
}
})
);
}
// replaces esi include tags, recursively in fetched content, to a max depth of 3
const transformer = transformerFactory(0);To abort replacement after a certain number of replacements (or, for any other reason), provide an AbortSignal:
const abortController = new AbortController();
const transformer = new AsyncReplaceContentTransformer(
new AsyncIterableFunctionReplacementProcessor({
searchStrategy: new StringAnchorSearchStrategy(["<esi:include", ">"]),
replacement: async (match, index) => {
const {
groups: { url }
} = /src="(?<url>[^"]+)"/.exec(match)!;
const response = await fetch(url);
if (response.ok) {
return response.body.pipeThrough(new TextDecoderStream());
}
if (index === 1) {
abortController.abort(); // after two replacements, stop replacing
}
}
}),
abortController.signal
);This will ensure the transform is "pass through" once the abort is signalled.
For fetch uses cases, with cancellation external to the replacement function, consider sharing the abort signal:
const abortController = new AbortController();
const transformer = new AsyncReplaceContentTransformer(
new AsyncIterableFunctionReplacementProcessor({
searchStrategy: new StringAnchorSearchStrategy(["<esi:include", ">"]),
replacement: async (match) => {
const {
groups: { url }
} = /src="(?<url>[^"]+)"/.exec(match)!;
try {
const response = await fetch(url, { signal: abortController.signal });
if (response.ok) {
return response.body!.pipeThrough(new TextDecoderStream());
}
} catch (error: unknown) {
if (error instanceof Error && error.name === "AbortError") {
// needs to be an async iterable to satisfy the AsyncIterableFunctionReplacementProcessor. (Awaiting AsyncIterator.from(["<!-- cancelled -->"]) in proposal: https://github.com/tc39/proposal-async-iterator-helpers)
return (async function* () {
yield "<!-- cancelled -->";
})();
}
throw error;
}
}
}),
abortController.signal
);
someEventBus.once("someEvent", () => abortController.abort());This should ensure in-flight requests are cancelled along with ongoing replacement.
Use the Node adapters (ReplaceContentTransform / AsyncReplaceContentTransform) for a native stream.Transform implementation, if performance cost of toWeb / fromWeb conversion is a concern.
// streaming esi middleware for express.js, using native NodeJs stream.Transform
import { responseHandler } from "express-intercept";
import { AsyncReplaceContentTransform } from "replace-content-transformer/node";
import type { Readable } from "node:stream";
import { get } from "node:https";
const searchStrategy = searchStrategyFactory(["<esi:include", "/>"]);
const maxDepth = 3;
function transformFactory(currentDepth: number) {
return new AsyncReplaceContentTransform(
new AsyncIterableFunctionReplacementProcessor({
searchStrategy,
replacement: async (match: string) => {
const {
groups: { url }
} = /src="(?<url>[^"]+)"/.exec(match)!;
const nodeStream = await new Promise<Readable>((resolve, reject) => {
get(url, (res) => resolve(res)).on("error", reject);
});
return currentDepth < maxDepth
? nodeStream.pipe(transformFactory(currentDepth + 1))
: nodeStream;
}
})
);
}
const expressMiddleware = responseHandler()
.if((res) => /html/i.test(res.getHeader("content-type")))
.interceptStream((upstream: Readable, _, res) => {
res.removeHeader("content-length");
return upstream.pipe(transformFactory(0));
});The library uses a composable architecture that finds and replaces patterns across chunk boundaries.
It has separated concerns:
- Search Strategies - Define what to match (e.g., literal strings, arrays of strings as anchors, regular expressions)
- Replacement Processors - Enact strategies using replacement logic and yield output via generators
Pluggable strategies implement the SearchStrategy interface:
interface MatchResult {
content: string;
match: boolean;
}
interface SearchStrategy<TState> {
createState(): TState;
processChunk(
haystack: string,
state: TState
): Generator<MatchResult, void, undefined>;
flush(state: TState): string;
}The TState type is specific to the strategy, managed by the consuming processor / stream, to keep the strategies stateless. This means any construction cost can be reduced, with strategies re-used across multiple streams.
The flush is called by the processor to extract anything buffered from the search strategy. This also re-sets the provided state parameter for re-use.
Each strategy contains the pattern-matching logic for a specific use case:
StringAnchorSearchStrategy- finds either single tokens, or "anchor" tokens delimiting start/end (or in sequence in-between) of a matchRegexSearchStrategy- Matches against regular expressions (with some caveats)
See search strategies for detail of functionality, and development of the strategies.
If tree-shaking is not a concern, a factory method for generating a search strategy based on appropriate input is available:
import { searchStrategyFactory } from "replace-content-transformer";
const searchStrategy =
searchStrategyFactory(input: string | string[] | RegExp);However, if choice of string vs regular expression requirement is known at design time, a smaller bundle will be afforded by importing a strategy directly:
import { StringAnchorSearchStrategy } from "replace-content-transformer";
const searchStrategy = new StringAnchorSearchStrategy(["<!--replace me -->"]); // single token..or
const searchStrategy = new StringAnchorSearchStrategy(["{{", "}}"]); // 2+ "anchor" delimiters/tokensor
import { RegexSearchStrategy } from "replace-content-transformer";
const searchStrategy = new RegexSearchStrategy(/<div>.+?<\/div>/s); // regular expression for complete matchProcessors accept chunks from the Transformer (web) / stream.Transform (node), and orchestrate replacement, using a search strategy.
// sync or async, dependent on asynchronicity of the replacement needed
*processChunk(chunk: string): Generator<string, void, undefined> {
for (const result of this.searchStrategy.processChunk(
chunk,
this.searchState
)) {
if (result.match) {
yield /* some replacement form (static, functional, iterator, async...) */
} else {
return result.content
}
}
}
// common to all processors
flush(): string {
return this.searchStrategy.flush(this.searchState);
}Why so many options?
There are 5 stream processors to select from, rather than the system figuring out the optimum based on supplied options. See Replacement Processors for detailed usage guidance.
StaticReplacementProcessor- Yields static stringsFunctionReplacementProcessor- Yields function results, passing the match and a match index / sequence numberIterableFunctionReplacementProcessor- Allows a function to return an iterable, flattened withyield*AsyncFunctionReplacementProcessor- Allows an async function, as an async generators withfor awaitAsyncIterableFunctionReplacementProcessor- Flattens async iterables withyield* await(assumption that async iterator is itself accessed via a Promise)
There is no reliable way in javascript to detect the output type of a function without calling it, and trying to adapt just-in-time based on the first replacement made would be complex. The type of function can be thought to have a "colour" that requires up-front selection.
Rather than a one-size-fits-all / common-denominator supporting asynchronicity (whether needed or not) or adapting to varying function output, the design accepts that a slight (but potentially significant) performance overhead exists with asynchronicity (in Node, at least) 2
Forcing all consumers to act asynchronously, or creating arbitrary iterator adapters above a simple static replacement, was deemed more unwieldy than the choice to be made.
The project aimed for a lightweight code footprint, so providing many options (with unused variation tree-shaken out) is a means to optimise.
Why generators?
- Lazy evaluation - Output is produced only when deemed consumable
- Memory efficient - No need to accumulate entire result
- Backpressure support - Downstream can control the flow rate
- Cancellable - Consumer can abort matching mid-chunk
- Composition - Easily chain with iterables, async iterables, or streams
# Install dependencies
npm install
# Run tests
npm test
# Run tests in watch mode
npm run test:watch
# Run tests with coverage
npm run test:ci
# Run benchmarks ("looped indexOf anchored" search strategy, on Node runtime)
npm run bench
# Run runtime benchmarks (as above, but across Node, Deno and Bun, where installed)
npm run bench:runtimes
# Run algorithm benchmarks (comparing algorithms - most not exported, for comparison)
npm run bench:algorithms
# Lint code
npm run lint
# Build
npm run build- Search Strategies - Pattern matching algorithms for single tokens, anchor sequences, and regular expressions
- Replacement Processors - Static, function-based, iterable, and async replacement logic
- Adapters - WHATWG Transformer and Node.js stream.Transform implementations
- Factory Functions - Strategy factory and helper utilities
- Cross-component - Processors combined with search strategies
- Streaming scenarios - Transformers with processors in stream pipelines
- Promise handling - Async replacement functions and promise-based workflows
- Abort signals - Cancellation and signal propagation
- Algorithm comparison - 14 different search strategy implementations validated against identical test scenarios:
- Single and multi-chunk replacements
- Tokens split across chunk boundaries at various positions
- Consecutive and nested patterns
- False starts and pathological cases (repetitive characters, long tokens)
- Edge cases (empty content, incomplete patterns, LaTeX-like nested braces)
- Real-world scenarios (HTML templating, cross-boundary matches)
All tests run across multiple runtimes (Node.js, Bun, Deno) in CI. See Benchmarks for performance analysis.
This library uses the WHATWG Streams API and is compatible with multiple JavaScript runtimes:
- Node.js 18.0.0+
- Bun 1.0+
- Deno 1.17+
- Browsers:
- Chrome 52+
- Firefox 65+
- Safari 14.1+
- Edge 79+
- Edge Workers:
- Cloudflare Workers
- Vercel Edge Functions
- Akamai EdgeWorkers
- Fastly Compute
Tom Pereira - GitHub
Contributions are welcome!
Please feel free to raise an Issue and/or submit a Pull Request.
- replacing TransformStream example
- node:stream/web - Node.js WHATWG Streams implementation
- TransformStream API - MDN documentation
- regex-partial-match - companion project powering the
regexsearch strategy - parse5-html-rewriting-stream - a Streaming SAX-style HTML rewriter
- stream-replace-string - A Node stream transform (abandoned)
- replacestream - a Node stream transform, supporting regex matching (last update 2016)
- string-searching - various string searching algorithms in javascript