Skip to content

WalshyDev/streaming-utils

Repository files navigation

streaming-utils

Streaming TransformStream utilities built on Web Standards. Process data through pipelines -- base64, encryption, compression, hashing, and more -- without buffering into memory.

Works in Cloudflare Workers, Deno, Bun, Node.js, and browsers.

Install

npm install streaming-utils

Quick Start

Every utility is a factory function that returns a standard TransformStream. Compose them with the native .pipeThrough() API:

import { base64Encode, compress } from 'streaming-utils';

const encoded = readableStream
  .pipeThrough(compress('gzip'))
  .pipeThrough(base64Encode());

Or use the optional StreamUtils wrapper for a chainable API with convenient constructors and consumers:

import { StreamUtils, compress, base64Encode } from 'streaming-utils';

const result = await StreamUtils.fromResponse(response)
  .pipeThrough(compress('gzip'))
  .pipeThrough(base64Encode())
  .toReadableStream();

API

StreamUtils (optional wrapper)

A chainable wrapper around ReadableStream. Totally optional -- every transform works standalone with native streams.

import { StreamUtils } from 'streaming-utils';

// Constructors
StreamUtils.from(readableStream)
StreamUtils.fromString('hello')
StreamUtils.fromBytes(uint8Array)
StreamUtils.fromResponse(response)

// Chaining
stream.pipeThrough(transform)   // returns new StreamUtils
stream.pipeTo(writableStream)   // returns Promise<void>

// Consumers
await stream.toBytes()          // Uint8Array
await stream.toText()           // string (UTF-8)
stream.toReadableStream()       // ReadableStream
stream.toResponse(init?)        // Response
stream.tee()                    // [StreamUtils, StreamUtils]

Base64

import { base64Encode, base64Decode } from 'streaming-utils';

// Uint8Array → string (base64)
readableStream.pipeThrough(base64Encode());

// string (base64) → Uint8Array
base64Stream.pipeThrough(base64Decode());

Byte-level variants that keep the entire pipeline as Uint8Array:

import { base64EncodeBytes, base64DecodeBytes } from 'streaming-utils';

// Uint8Array → Uint8Array (UTF-8 encoded base64)
readableStream.pipeThrough(base64EncodeBytes());

// Uint8Array (UTF-8 encoded base64) → Uint8Array
readableStream.pipeThrough(base64DecodeBytes());

Encryption

AES-GCM (authenticated, buffered)

AES-GCM produces an auth tag over the full plaintext, so it buffers the entire stream before encrypting. The IV is automatically prepended to the output.

import { aesGcmEncrypt, aesGcmDecrypt } from 'streaming-utils';

const key = await crypto.subtle.generateKey(
  { name: 'AES-GCM', length: 256 },
  true,
  ['encrypt', 'decrypt'],
);

// Encrypt -- IV is prepended to output
const encrypted = readableStream.pipeThrough(aesGcmEncrypt({ key }));

// Decrypt -- reads IV from first 12 bytes
const decrypted = encryptedStream.pipeThrough(aesGcmDecrypt({ key }));

You can also pass a raw Uint8Array as the key, and optionally provide your own iv or additionalData.

AES-CTR (streaming)

AES-CTR encrypts chunk-by-chunk and handles 16-byte block alignment automatically. Suitable for large data.

import { aesCtrEncrypt, aesCtrDecrypt } from 'streaming-utils';

const key = await crypto.subtle.generateKey(
  { name: 'AES-CTR', length: 256 },
  true,
  ['encrypt', 'decrypt'],
);
const counter = crypto.getRandomValues(new Uint8Array(16));

const encrypted = readableStream.pipeThrough(aesCtrEncrypt({ key, counter }));
const decrypted = encryptedStream.pipeThrough(aesCtrDecrypt({ key, counter }));

Note: AES-CTR does not provide authentication. Pair with an HMAC if you need integrity.

Compression

Wraps the built-in CompressionStream/DecompressionStream APIs.

import { compress, decompress } from 'streaming-utils';

// gzip, deflate, or deflate-raw
const compressed = readableStream.pipeThrough(compress('gzip'));
const decompressed = compressedStream.pipeThrough(decompress('gzip'));

Hashing

Computes a digest over the entire stream using Web Crypto.

import { hash } from 'streaming-utils';

// SHA-256 as Uint8Array (default)
const digest = readableStream.pipeThrough(hash());

// SHA-256 as hex string
const hexDigest = readableStream.pipeThrough(hash({ output: 'hex' }));

// SHA-512
const sha512 = readableStream.pipeThrough(hash({ algorithm: 'SHA-512' }));

Supported algorithms: SHA-1, SHA-256, SHA-384, SHA-512.

Text

import { textEncode, textDecode } from 'streaming-utils';

// string → Uint8Array
stringStream.pipeThrough(textEncode());

// Uint8Array → string (handles multi-byte splits across chunks)
byteStream.pipeThrough(textDecode());

Hex

import { hexEncode, hexDecode } from 'streaming-utils';

// Uint8Array → string (lowercase hex)
readableStream.pipeThrough(hexEncode());

// string (hex) → Uint8Array
hexStream.pipeThrough(hexDecode());

NDJSON

import { ndjsonParse, ndjsonSerialize, textDecode } from 'streaming-utils';

// Parse newline-delimited JSON
const objects = byteStream
  .pipeThrough(textDecode())
  .pipeThrough(ndjsonParse());

// Serialize objects to NDJSON
const ndjson = objectStream.pipeThrough(ndjsonSerialize());

Chunking Utilities

import { fixedChunkSize, limitBytes, tap } from 'streaming-utils';

// Re-chunk into fixed-size blocks (last chunk may be smaller)
readableStream.pipeThrough(fixedChunkSize(65536));

// Truncate stream after N bytes
readableStream.pipeThrough(limitBytes(1024 * 1024));

// Observe bytes passing through without modifying them
readableStream.pipeThrough(
  tap((totalBytes) => console.log(`${totalBytes} bytes so far`))
);

Node.js Stream Interop

Separate entry point so non-Node runtimes never import it:

import { fromNodeReadable, toNodeReadable, fromNodeWritable } from 'streaming-utils/node';
import { createReadStream, createWriteStream } from 'fs';

// Node Readable → Web ReadableStream
const webStream = fromNodeReadable(createReadStream('input.txt'));

// Web ReadableStream → Node Readable
const nodeStream = toNodeReadable(webReadableStream);

// Node Writable → Web WritableStream
const webWritable = fromNodeWritable(createWriteStream('output.txt'));
await readableStream.pipeTo(webWritable);

Uses Readable.toWeb() / Readable.fromWeb() when available (Node 17+), with a manual fallback for older versions.

Composing Transforms

The power is in composition. Every transform is just a TransformStream, so you chain them with .pipeThrough():

import {
  StreamUtils,
  compress,
  base64EncodeBytes,
  aesCtrEncrypt,
  tap,
} from 'streaming-utils';

const key = /* ... */;
const counter = crypto.getRandomValues(new Uint8Array(16));

const result = await StreamUtils.fromResponse(await fetch('https://example.com/data'))
  .pipeThrough(tap((n) => console.log(`Downloaded ${n} bytes`)))
  .pipeThrough(compress('gzip'))
  .pipeThrough(aesCtrEncrypt({ key, counter }))
  .pipeThrough(base64EncodeBytes())
  .toBytes();

Runtime Support

Tested across four runtimes on every change:

Runtime Command
Node.js npm run test:node
Cloudflare Workers (workerd) npm run test:workers
Bun npm run test:bun
Deno npm run test:deno
All npm run test:all

License

MIT

About

Streaming utilities built on Web Standards. Process data through pipelines -- base64, encryption, compression, hashing, and more -- without buffering into memory.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors