From 2eaf2d9691073a1f2516d7c75c12f1f30aadda33 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 01/12] feat: add gzip/xz compression utilities --- package-lock.json | 64 +++++-- package.json | 1 + src/utils/compression.ts | 188 +++++++++++++++++++ test/unit/utils/compression.spec.ts | 278 ++++++++++++++++++++++++++++ 4 files changed, 515 insertions(+), 16 deletions(-) create mode 100644 src/utils/compression.ts create mode 100644 test/unit/utils/compression.spec.ts diff --git a/package-lock.json b/package-lock.json index ee895909..1da670dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "express": "4.22.1", "js-yaml": "4.1.1", "knex": "2.4.2", + "lzma-native": "^8.0.6", "pg": "8.9.0", "pg-query-stream": "4.3.0", "ramda": "0.28.0", @@ -94,7 +95,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1571,7 +1571,6 @@ "integrity": "sha512-Kxap9uP5jD8tHUZVjTWgzxemi/0uOsbGjd4LBOSxcJoOCRbESFwemUzilJuzNTB8pcTQUh8D5oudUyxfkJOKmA==", "dev": true, "license": "MIT", - "peer": true, "peerDependencies": { "@cucumber/messages": ">=17.1.1" } @@ -2307,7 +2306,6 @@ "integrity": "sha512-YfcB2QrX+Wx1o6LD1G2Y2fhDhOix/bAY/oAnMpHoNLsKkWIRbt1oKLkIFvxBMzLwAEPqnYWguJrYC+J6i4ywbw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "bole": "^5.0.0", "ndjson": "^2.0.0" @@ -2472,7 +2470,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.4.2.tgz", "integrity": "sha512-oUdEjE0I7JS5AyaAjkD3aOXn9NhO7XKyPyXEyrgFDu++VrVBHUPnV6dgEya9TcMuj5nIJRuCzCm8ZP+c9zCHPw==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.1", "generic-pool": "3.9.0", @@ -2749,7 +2746,6 @@ "integrity": "sha512-A1sre26ke7HDIuY/M23nd9gfB+nrmhtYyMINbjI1zHJxYteKR6qSMX56FsmjMcDb3SMcjJg5BiRRgOCC/yBD0g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -3305,7 +3301,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.10.12", "caniuse-lite": "^1.0.30001782", @@ -3490,7 +3485,6 @@ "integrity": "sha512-RITGBfijLkBddZvnn8jdqoTypxvqbOLYQkGGxXzeFjVHvudaPw0HNFD9x928/eUwYWd2dPCugVqspGALTZZQKw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "assertion-error": "^1.1.0", "check-error": "^1.0.3", @@ -3864,7 +3858,6 @@ "integrity": "sha512-kcZ6+W5QzcJ3P1Mt+83OUv/oHFqZHIx8DuxG6eZ5RGMERoLqp4BuGjhHLYGK+Kf5XVkQvqBSmAy/nGWN3qDgEA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "import-fresh": "^3.3.0", "js-yaml": "^4.1.0", @@ -6318,6 +6311,36 @@ "node": ">=12" } }, + "node_modules/lzma-native": { + "version": "8.0.6", + "resolved": "https://registry.npmjs.org/lzma-native/-/lzma-native-8.0.6.tgz", + "integrity": "sha512-09xfg67mkL2Lz20PrrDeNYZxzeW7ADtpYFbwSQh9U8+76RIzx5QsJBMy8qikv3hbUPfpy6hqwxt6FcGK81g9AA==", + "hasInstallScript": true, + "dependencies": { + "node-addon-api": "^3.1.0", + "node-gyp-build": "^4.2.1", + "readable-stream": "^3.6.0" + }, + "bin": { + "lzmajs": "bin/lzmajs" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/lzma-native/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/magic-string": { "version": "0.16.0", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.16.0.tgz", @@ -6941,6 +6964,21 @@ "tslib": "^2.0.3" } }, + "node_modules/node-addon-api": { + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.2.1.tgz", + "integrity": "sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A==" + }, + "node_modules/node-gyp-build": { + "version": "4.8.4", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz", + "integrity": "sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==", + "bin": { + "node-gyp-build": "bin.js", + "node-gyp-build-optional": "optional.js", + "node-gyp-build-test": "build-test.js" + } + }, "node_modules/node-preload": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz", @@ -7723,7 +7761,6 @@ "resolved": "https://registry.npmjs.org/pg/-/pg-8.9.0.tgz", "integrity": "sha512-ZJM+qkEbtOHRuXjmvBtOgNOXOtLSbxiMiUVMgE4rV6Zwocy03RicCVvDXgx8l4Biwo8/qORUnEqn2fdQzV7KCg==", "license": "MIT", - "peer": true, "dependencies": { "buffer-writer": "2.0.0", "packet-reader": "1.0.0", @@ -8465,7 +8502,8 @@ "resolved": "https://registry.npmjs.org/reflect-metadata/-/reflect-metadata-0.2.2.tgz", "integrity": "sha512-urBwgfrvVP/eAyXx4hluJivBKzuEbSQs9rKWCrCkbSxNv8mxPcUZKeuoF3Uy4mJl3Lwprp6yy5/39VWigZ4K6Q==", "dev": true, - "license": "Apache-2.0" + "license": "Apache-2.0", + "peer": true }, "node_modules/regexp-match-indices": { "version": "1.0.2", @@ -9264,7 +9302,6 @@ "version": "1.1.1", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", - "dev": true, "license": "MIT", "dependencies": { "safe-buffer": "~5.1.0" @@ -9274,7 +9311,6 @@ "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", - "dev": true, "license": "MIT" }, "node_modules/string-argv": { @@ -9745,7 +9781,6 @@ "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@cspotcode/source-map-support": "^0.8.0", "@tsconfig/node10": "^1.0.7", @@ -10018,7 +10053,6 @@ "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -10128,7 +10162,6 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "dev": true, "license": "MIT" }, "node_modules/utils-merge": { @@ -10557,7 +10590,6 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.22.4.tgz", "integrity": "sha512-iC+8Io04lddc+mVqQ9AZ7OQ2MrUKGN+oIQyq1vemgt46jwCwLfhq7/pwnBnNXXXZb8VTVLKwp9EDkx+ryxIWmg==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } diff --git a/package.json b/package.json index 33e61c2f..f940432c 100644 --- a/package.json +++ b/package.json @@ -131,6 +131,7 @@ "express": "4.22.1", "js-yaml": "4.1.1", "knex": "2.4.2", + "lzma-native": "^8.0.6", "pg": "8.9.0", "pg-query-stream": "4.3.0", "ramda": "0.28.0", diff --git a/src/utils/compression.ts b/src/utils/compression.ts new file mode 100644 index 00000000..e3f65459 --- /dev/null +++ b/src/utils/compression.ts @@ -0,0 +1,188 @@ +import { createGunzip, createGzip } from 'zlib' +import { PassThrough, Transform } from 'stream' +import { cpus } from 'os' +import { extname } from 'path' +import { open } from 'fs/promises' + +export enum CompressionFormat { + GZIP = 'gzip', + XZ = 'xz', +} + +const GZIP_MAGIC = Buffer.from([0x1f, 0x8b]) +const XZ_MAGIC = Buffer.from([0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00]) + +const DEFAULT_XZ_PRESET = 6 +const DEFAULT_MAX_XZ_THREADS = 4 +const MIN_XZ_PRESET = 0 +const MAX_XZ_PRESET = 9 + +type LzmaNative = { + createCompressor: (options?: Record) => Transform + createDecompressor: (options?: Record) => Transform +} + +type Environment = Record + +const getLzmaNative = (): LzmaNative => { + try { + return require('lzma-native') as LzmaNative + } catch (error) { + const reason = error instanceof Error ? error.message : String(error) + + throw new Error(`XZ support requires the "lzma-native" package. Install dependencies and try again. (${reason})`) + } +} + +const parseIntegerEnv = ( + key: string, + env: Environment, +): number | undefined => { + const rawValue = env[key] + if (!rawValue || rawValue.trim() === '') { + return undefined + } + + if (!/^-?\d+$/.test(rawValue.trim())) { + throw new Error(`Invalid ${key}: ${rawValue}. Expected an integer.`) + } + + return Number(rawValue) +} + +export const getXzCompressionOptions = ( + cpuCount: number, + env: Environment = process.env, +): { preset: number; threads: number } => { + const parsedPreset = parseIntegerEnv('NOSTREAM_XZ_PRESET', env) + const preset = parsedPreset ?? DEFAULT_XZ_PRESET + + if (preset < MIN_XZ_PRESET || preset > MAX_XZ_PRESET) { + throw new Error( + `Invalid NOSTREAM_XZ_PRESET: ${preset}. Expected an integer between ${MIN_XZ_PRESET} and ${MAX_XZ_PRESET}.`, + ) + } + + const parsedThreadCap = parseIntegerEnv('NOSTREAM_XZ_THREADS', env) + if (parsedThreadCap !== undefined && parsedThreadCap <= 0) { + throw new Error('Invalid NOSTREAM_XZ_THREADS: expected a positive integer.') + } + + // Keep one core available by default to reduce contention with the running relay. + const availableThreads = Math.max(1, Math.max(1, Math.trunc(cpuCount)) - 1) + const maxThreads = parsedThreadCap ?? DEFAULT_MAX_XZ_THREADS + + return { + preset, + threads: Math.max(1, Math.min(availableThreads, maxThreads)), + } +} + +export const parseCompressionFormat = (input: string): CompressionFormat => { + switch (input.trim().toLowerCase()) { + case 'gzip': + case 'gz': + return CompressionFormat.GZIP + case 'xz': + return CompressionFormat.XZ + default: + throw new Error(`Unsupported compression format: ${input}. Use gzip|gz|xz.`) + } +} + +export const getCompressionFormatFromExtension = ( + filePath: string, +): CompressionFormat | undefined => { + switch (extname(filePath).toLowerCase()) { + case '.gz': + return CompressionFormat.GZIP + case '.xz': + return CompressionFormat.XZ + default: + return undefined + } +} + +export const getCompressionFormatFromHeader = ( + header: Buffer, +): CompressionFormat | undefined => { + if (header.length >= GZIP_MAGIC.length && header.subarray(0, GZIP_MAGIC.length).equals(GZIP_MAGIC)) { + return CompressionFormat.GZIP + } + + if (header.length >= XZ_MAGIC.length && header.subarray(0, XZ_MAGIC.length).equals(XZ_MAGIC)) { + return CompressionFormat.XZ + } + + return undefined +} + +const readFileHeader = async (filePath: string, bytes = XZ_MAGIC.length): Promise => { + const fileHandle = await open(filePath, 'r') + + try { + const header = Buffer.alloc(bytes) + const { bytesRead } = await fileHandle.read(header, 0, bytes, 0) + + return header.subarray(0, bytesRead) + } finally { + await fileHandle.close() + } +} + +export const detectCompressionFormat = async ( + filePath: string, +): Promise => { + const extensionFormat = getCompressionFormatFromExtension(filePath) + const header = await readFileHeader(filePath) + const headerFormat = getCompressionFormatFromHeader(header) + + if (extensionFormat && headerFormat && extensionFormat !== headerFormat) { + throw new Error( + `Compression mismatch for ${filePath}: extension suggests ${extensionFormat} but header is ${headerFormat}.`, + ) + } + + return headerFormat ?? extensionFormat +} + +export const createCompressionStream = ( + format?: CompressionFormat, +): Transform => { + if (!format) { + return new PassThrough() + } + + switch (format) { + case CompressionFormat.GZIP: + return createGzip() + case CompressionFormat.XZ: { + const lzmaNative = getLzmaNative() + const { preset, threads } = getXzCompressionOptions(cpus().length) + + return lzmaNative.createCompressor({ + preset, + threads, + }) + } + default: + throw new Error(`Unsupported compression format: ${String(format)}`) + } +} + +export const createDecompressionStream = ( + format?: CompressionFormat, +): Transform => { + if (!format) { + return new PassThrough() + } + + switch (format) { + case CompressionFormat.GZIP: + return createGunzip() + case CompressionFormat.XZ: + return getLzmaNative().createDecompressor() + default: + throw new Error(`Unsupported compression format: ${String(format)}`) + } +} diff --git a/test/unit/utils/compression.spec.ts b/test/unit/utils/compression.spec.ts new file mode 100644 index 00000000..a2ff4ec7 --- /dev/null +++ b/test/unit/utils/compression.spec.ts @@ -0,0 +1,278 @@ +import { join } from 'path' + +import fs from 'fs' +import os from 'os' +import { Readable } from 'stream' + +import { expect } from 'chai' + +import { + CompressionFormat, + createCompressionStream, + createDecompressionStream, + detectCompressionFormat, + getXzCompressionOptions, + getCompressionFormatFromExtension, + getCompressionFormatFromHeader, + parseCompressionFormat, +} from '../../../src/utils/compression' + +const toBuffer = async (stream: NodeJS.ReadableStream): Promise => { + const chunks: Buffer[] = [] + + for await (const chunk of stream as AsyncIterable) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) + } + + return Buffer.concat(chunks) +} + +const expectStreamToFail = async (stream: NodeJS.ReadableStream): Promise => { + try { + await toBuffer(stream) + expect.fail('Expected stream to fail') + } catch (error) { + expect(error).to.be.instanceOf(Error) + } +} + +describe('compression utils', () => { + const tempDirs: string[] = [] + + const createTempFile = (name: string, data: Buffer): string => { + const tmpDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-compression-')) + tempDirs.push(tmpDir) + + const filePath = join(tmpDir, name) + fs.writeFileSync(filePath, data) + + return filePath + } + + afterEach(() => { + for (const tmpDir of tempDirs.splice(0)) { + fs.rmSync(tmpDir, { + force: true, + recursive: true, + }) + } + }) + + it('parses format aliases', () => { + expect(parseCompressionFormat('gzip')).to.equal(CompressionFormat.GZIP) + expect(parseCompressionFormat('gz')).to.equal(CompressionFormat.GZIP) + expect(parseCompressionFormat('xz')).to.equal(CompressionFormat.XZ) + expect(parseCompressionFormat(' GZip ')).to.equal(CompressionFormat.GZIP) + }) + + it('throws when parsing an unsupported format', () => { + expect(() => parseCompressionFormat('brotli')).to.throw('Unsupported compression format') + }) + + it('detects compression from extension', () => { + expect(getCompressionFormatFromExtension('events.jsonl.gz')).to.equal(CompressionFormat.GZIP) + expect(getCompressionFormatFromExtension('events.jsonl.xz')).to.equal(CompressionFormat.XZ) + expect(getCompressionFormatFromExtension('events.jsonl.GZ')).to.equal(CompressionFormat.GZIP) + expect(getCompressionFormatFromExtension('events.jsonl')).to.equal(undefined) + }) + + it('detects compression from magic header bytes', () => { + expect(getCompressionFormatFromHeader(Buffer.from([0x1f, 0x8b, 0x08]))).to.equal(CompressionFormat.GZIP) + expect( + getCompressionFormatFromHeader(Buffer.from([0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00, 0x00])), + ).to.equal(CompressionFormat.XZ) + expect(getCompressionFormatFromHeader(Buffer.from('nostream'))).to.equal(undefined) + }) + + it('computes default xz options and reserves one CPU core', () => { + expect(getXzCompressionOptions(16, {})).to.deep.equal({ + preset: 6, + threads: 4, + }) + + expect(getXzCompressionOptions(2, {})).to.deep.equal({ + preset: 6, + threads: 1, + }) + + expect(getXzCompressionOptions(1, {})).to.deep.equal({ + preset: 6, + threads: 1, + }) + }) + + it('uses NOSTREAM_XZ_THREADS when provided', () => { + expect( + getXzCompressionOptions(8, { + NOSTREAM_XZ_THREADS: '2', + }), + ).to.deep.equal({ + preset: 6, + threads: 2, + }) + }) + + it('clamps configured xz threads to available CPU cores', () => { + const options = getXzCompressionOptions(4, { + NOSTREAM_XZ_THREADS: '99', + }) + + expect(options.threads).to.equal(3) + }) + + it('uses NOSTREAM_XZ_PRESET when provided', () => { + expect( + getXzCompressionOptions(8, { + NOSTREAM_XZ_PRESET: '9', + }), + ).to.deep.equal({ + preset: 9, + threads: 4, + }) + }) + + it('throws on invalid NOSTREAM_XZ_THREADS values', () => { + expect(() => getXzCompressionOptions(8, { + NOSTREAM_XZ_THREADS: '0', + })).to.throw('Invalid NOSTREAM_XZ_THREADS: expected a positive integer.') + + expect(() => getXzCompressionOptions(8, { + NOSTREAM_XZ_THREADS: 'abc', + })).to.throw('Invalid NOSTREAM_XZ_THREADS: abc. Expected an integer.') + }) + + it('throws on invalid NOSTREAM_XZ_PRESET values', () => { + expect(() => getXzCompressionOptions(8, { + NOSTREAM_XZ_PRESET: '-1', + })).to.throw('Invalid NOSTREAM_XZ_PRESET: -1. Expected an integer between 0 and 9.') + + expect(() => getXzCompressionOptions(8, { + NOSTREAM_XZ_PRESET: '10', + })).to.throw('Invalid NOSTREAM_XZ_PRESET: 10. Expected an integer between 0 and 9.') + + expect(() => getXzCompressionOptions(8, { + NOSTREAM_XZ_PRESET: 'abc', + })).to.throw('Invalid NOSTREAM_XZ_PRESET: abc. Expected an integer.') + }) + + it('round-trips data with gzip streams', async () => { + const input = Buffer.from('nostream-gzip-test\n'.repeat(1024), 'utf-8') + + const compressed = await toBuffer( + Readable.from([input]).pipe(createCompressionStream(CompressionFormat.GZIP)), + ) + + const decompressed = await toBuffer( + Readable.from([compressed]).pipe(createDecompressionStream(CompressionFormat.GZIP)), + ) + + expect(decompressed.equals(input)).to.equal(true) + }) + + it('round-trips data with xz streams', async () => { + const input = Buffer.from('nostream-xz-test\n'.repeat(2048), 'utf-8') + + const compressed = await toBuffer( + Readable.from([input]).pipe(createCompressionStream(CompressionFormat.XZ)), + ) + + const decompressed = await toBuffer( + Readable.from([compressed]).pipe(createDecompressionStream(CompressionFormat.XZ)), + ) + + expect(decompressed.equals(input)).to.equal(true) + }) + + it('passes data through unchanged when compression is disabled', async () => { + const input = Buffer.from('nostream passthrough', 'utf-8') + + const compressed = await toBuffer( + Readable.from([input]).pipe(createCompressionStream()), + ) + + const decompressed = await toBuffer( + Readable.from([compressed]).pipe(createDecompressionStream()), + ) + + expect(decompressed.equals(input)).to.equal(true) + }) + + it('detects compressed files by header even without compressed extension', async () => { + const input = Buffer.from('compressed payload\n'.repeat(128), 'utf-8') + + const gzipData = await toBuffer( + Readable.from([input]).pipe(createCompressionStream(CompressionFormat.GZIP)), + ) + + const filePath = createTempFile('events.backup', gzipData) + + const format = await detectCompressionFormat(filePath) + + expect(format).to.equal(CompressionFormat.GZIP) + }) + + it('falls back to extension-based detection when header is unavailable', async () => { + const filePath = createTempFile('empty.jsonl.xz', Buffer.alloc(0)) + + const format = await detectCompressionFormat(filePath) + + expect(format).to.equal(CompressionFormat.XZ) + }) + + it('throws when extension and file header formats conflict', async () => { + const input = Buffer.from('compressed payload\n'.repeat(128), 'utf-8') + + const gzipData = await toBuffer( + Readable.from([input]).pipe(createCompressionStream(CompressionFormat.GZIP)), + ) + + const filePath = createTempFile('events.jsonl.xz', gzipData) + + try { + await detectCompressionFormat(filePath) + expect.fail('Expected detectCompressionFormat to throw on header/extension mismatch') + } catch (error) { + expect((error as Error).message).to.contain('Compression mismatch') + } + }) + + it('fails to decompress invalid gzip payloads', async () => { + const invalidPayload = Buffer.from('not-a-gzip-stream', 'utf-8') + + await expectStreamToFail( + Readable.from([invalidPayload]).pipe(createDecompressionStream(CompressionFormat.GZIP)), + ) + }) + + it('fails to decompress invalid xz payloads', async () => { + const invalidPayload = Buffer.from('not-an-xz-stream', 'utf-8') + + await expectStreamToFail( + Readable.from([invalidPayload]).pipe(createDecompressionStream(CompressionFormat.XZ)), + ) + }) + + it('round-trips binary payloads across boundary sizes', async () => { + const sizes = [0, 1, 2, 31, 32, 33, 1024, 8192] + const formats = [CompressionFormat.GZIP, CompressionFormat.XZ] + + for (const size of sizes) { + const input = Buffer.alloc(size) + for (let index = 0; index < size; index += 1) { + input[index] = (index * 31 + 17) % 256 + } + + for (const format of formats) { + const compressed = await toBuffer( + Readable.from([input]).pipe(createCompressionStream(format)), + ) + + const decompressed = await toBuffer( + Readable.from([compressed]).pipe(createDecompressionStream(format)), + ) + + expect(decompressed.equals(input)).to.equal(true) + } + } + }) +}) From 5fcad278588602637c650223894d2a0389435958 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 02/12] feat(import): support compressed event imports --- src/import-events.ts | 56 ++++-- src/services/event-import-service.ts | 159 +++++++++++------- test/unit/import-events.spec.ts | 75 +++++++++ .../services/event-import-service.spec.ts | 21 +++ 4 files changed, 238 insertions(+), 73 deletions(-) create mode 100644 test/unit/import-events.spec.ts diff --git a/src/import-events.ts b/src/import-events.ts index 9b51ca2d..83ef6d5f 100644 --- a/src/import-events.ts +++ b/src/import-events.ts @@ -1,7 +1,12 @@ -import { extname, resolve } from 'path' +import { resolve } from 'path' import fs from 'fs' +import { + CompressionFormat, + createDecompressionStream, + detectCompressionFormat, +} from './utils/compression' import { createEventBatchPersister, EventImportLineError, @@ -27,8 +32,10 @@ const formatProgress = (stats: EventImportStats): string => { } const printUsage = (): void => { - console.log('Usage: npm run import -- [--batch-size ]') + console.log('Usage: npm run import -- [--batch-size ]') console.log('Example: npm run import -- ./events.jsonl --batch-size 1000') + console.log('Example: npm run import -- ./events.jsonl.gz') + console.log('Example: npm run import -- ./events.jsonl.xz') } const parseBatchSize = (value: string): number => { @@ -41,7 +48,7 @@ const parseBatchSize = (value: string): number => { return parsedValue } -const parseCliArgs = (args: string[]): CliOptions => { +export const parseCliArgs = (args: string[]): CliOptions => { let batchSize = DEFAULT_BATCH_SIZE let filePath: string | undefined @@ -72,7 +79,7 @@ const parseCliArgs = (args: string[]): CliOptions => { continue } - if (arg.startsWith('--')) { + if (arg.startsWith('-')) { throw new Error(`Unknown option: ${arg}`) } @@ -84,7 +91,7 @@ const parseCliArgs = (args: string[]): CliOptions => { } if (!filePath) { - throw new Error('Missing path to .jsonl file') + throw new Error('Missing input file path') } return { @@ -97,10 +104,6 @@ const parseCliArgs = (args: string[]): CliOptions => { const ensureValidInputFile = (filePath: string): string => { const absolutePath = resolve(process.cwd(), filePath) - if (extname(absolutePath).toLowerCase() !== '.jsonl') { - throw new Error('Input file must have a .jsonl extension') - } - if (!fs.existsSync(absolutePath)) { throw new Error(`Input file does not exist: ${absolutePath}`) } @@ -113,6 +116,34 @@ const ensureValidInputFile = (filePath: string): string => { return absolutePath } +const getCompressionLabel = (format: CompressionFormat): string => { + switch (format) { + case CompressionFormat.GZIP: + return 'gzip' + case CompressionFormat.XZ: + return 'xz' + default: + return String(format) + } +} + +const createImportStream = async (absoluteFilePath: string) => { + const compressionFormat = await detectCompressionFormat(absoluteFilePath) + const source = fs.createReadStream(absoluteFilePath) + + if (!compressionFormat) { + return { + compressionFormat, + stream: source, + } + } + + return { + compressionFormat, + stream: source.pipe(createDecompressionStream(compressionFormat)), + } +} + const run = async (): Promise => { const options = parseCliArgs(process.argv.slice(2)) @@ -147,7 +178,12 @@ const run = async (): Promise => { const startedAt = Date.now() try { - const stats = await importer.importFromJsonl(absoluteFilePath, { + const { stream, compressionFormat } = await createImportStream(absoluteFilePath) + if (compressionFormat) { + console.log(`Detected ${getCompressionLabel(compressionFormat)} compression. Decompressing on-the-fly...`) + } + + const stats = await importer.importFromReadable(stream, { batchSize: options.batchSize, onLineError, onProgress, diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts index 25e6df1c..40d1d606 100644 --- a/src/services/event-import-service.ts +++ b/src/services/event-import-service.ts @@ -17,8 +17,13 @@ import { Event } from '../@types/event' import { eventSchema } from '../schemas/event-schema' import { IEventRepository } from '../@types/repositories' +type EventWithImportMetadata = Event & { + [EventDeduplicationMetadataKey]?: string[] + [EventExpirationTimeMetadataKey]?: number +} + const enrichEventMetadata = (event: Event): Event => { - let enriched: any = event + let enriched: EventWithImportMetadata = event const expiration = getEventExpiration(event) if (expiration) { @@ -26,14 +31,13 @@ const enrichEventMetadata = (event: Event): Event => { } if (isParameterizedReplaceableEvent(event)) { - const [, ...deduplication] = event.tags.find((tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication) ?? [ - null, - '', - ] + const [, ...deduplication] = event.tags.find( + (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, + ) ?? [null, ''] enriched = { ...enriched, [EventDeduplicationMetadataKey]: deduplication } } - return enriched as Event + return enriched } const DEFAULT_BATCH_SIZE = 1000 @@ -64,67 +68,85 @@ const getErrorMessage = (error: unknown): string => { return String(error) } +const isDestroyableStream = ( + stream: NodeJS.ReadableStream, +): stream is NodeJS.ReadableStream & { destroy: () => void } => { + const candidate = stream as { destroy?: unknown } + + return typeof candidate.destroy === 'function' +} + export const createEventBatchPersister = (eventRepository: IEventRepository) => - async (events: Event[]): Promise => { - if (!events.length) { - return 0 - } - - let inserted = 0 + async (events: Event[]): Promise => { + if (!events.length) { + return 0 + } - const regularEvents: Event[] = [] - const replaceableEvents: Event[] = [] + let inserted = 0 - for (const event of events) { - if (isEphemeralEvent(event)) { - continue - } + const regularEvents: Event[] = [] + const replaceableEvents: Event[] = [] - if (isDeleteEvent(event)) { - // flush pending batches before applying deletes - inserted += await eventRepository.createMany(regularEvents.splice(0)) - inserted += await eventRepository.upsertMany(replaceableEvents.splice(0)) + for (const event of events) { + if (isEphemeralEvent(event)) { + continue + } - const eventIdsToDelete = event.tags.reduce( - (ids, tag) => - tag.length >= 2 && tag[0] === EventTags.Event && /^[0-9a-f]{64}$/.test(tag[1]) ? [...ids, tag[1]] : ids, - [] as string[], - ) + if (isDeleteEvent(event)) { + // flush pending batches before applying deletes + inserted += await eventRepository.createMany(regularEvents.splice(0)) + inserted += await eventRepository.upsertMany(replaceableEvents.splice(0)) + + const eventIdsToDelete = event.tags.reduce( + (ids, tag) => + tag.length >= 2 + && tag[0] === EventTags.Event + && /^[0-9a-f]{64}$/.test(tag[1]) + ? [...ids, tag[1]] + : ids, + [] as string[] + ) + + if (eventIdsToDelete.length) { + await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete) + } - if (eventIdsToDelete.length) { - await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete) + inserted += await eventRepository.create(enrichEventMetadata(event)) + continue } - inserted += await eventRepository.create(enrichEventMetadata(event)) - continue - } + const enrichedEvent = enrichEventMetadata(event) - const enrichedEvent = enrichEventMetadata(event) + if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) { + replaceableEvents.push(enrichedEvent) + continue + } - if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) { - replaceableEvents.push(enrichedEvent) - continue + regularEvents.push(enrichedEvent) } - regularEvents.push(enrichedEvent) - } - - // flush remaining - inserted += await eventRepository.createMany(regularEvents) - inserted += await eventRepository.upsertMany(replaceableEvents) + // flush remaining + inserted += await eventRepository.createMany(regularEvents) + inserted += await eventRepository.upsertMany(replaceableEvents) - return inserted - } + return inserted + } export class EventImportService { - public constructor(private readonly persistBatch: (events: Event[]) => Promise) {} - - public async importFromJsonl(filePath: string, options: EventImportOptions = {}): Promise { - const batchSize = - typeof options.batchSize === 'number' && Number.isInteger(options.batchSize) && options.batchSize > 0 - ? options.batchSize - : DEFAULT_BATCH_SIZE + public constructor( + private readonly persistBatch: (events: Event[]) => Promise, + ) {} + + public async importFromReadable( + input: NodeJS.ReadableStream, + options: EventImportOptions = {}, + ): Promise { + const batchSize = ( + typeof options.batchSize === 'number' + && Number.isInteger(options.batchSize) + && options.batchSize > 0 + ) ? options.batchSize : DEFAULT_BATCH_SIZE const onLineError = options.onLineError ?? (() => undefined) const onProgress = options.onProgress ?? (() => undefined) @@ -146,27 +168,25 @@ export class EventImportService { return } - const batchSize = batch.length + const currentBatchSize = batch.length const inserted = await this.persistBatch(batch) - if (!Number.isInteger(inserted) || inserted < 0 || inserted > batchSize) { - throw new Error(`Invalid insert count (${inserted}) for batch size ${batchSize}`) + if (!Number.isInteger(inserted) || inserted < 0 || inserted > currentBatchSize) { + throw new Error( + `Invalid insert count (${inserted}) for batch size ${currentBatchSize}`, + ) } stats.inserted += inserted - stats.skipped += batchSize - inserted + stats.skipped += currentBatchSize - inserted batch.length = 0 onProgress({ ...stats }) } - const stream = fs.createReadStream(filePath, { - encoding: 'utf-8', - }) - const lineReader = readline.createInterface({ crlfDelay: Infinity, - input: stream, + input, }) try { @@ -184,11 +204,11 @@ export class EventImportService { try { event = validateEventSchema(JSON.parse(trimmedLine)) as Event - if (!(await isEventIdValid(event))) { + if (!await isEventIdValid(event)) { throw new Error('invalid: event id does not match') } - if (!(await isEventSignatureValid(event))) { + if (!await isEventSignatureValid(event)) { throw new Error('invalid: event signature verification failed') } } catch (error) { @@ -213,7 +233,20 @@ export class EventImportService { return stats } finally { lineReader.close() - stream.destroy() + if (isDestroyableStream(input)) { + input.destroy() + } } } + + public async importFromJsonl( + filePath: string, + options: EventImportOptions = {}, + ): Promise { + const stream = fs.createReadStream(filePath, { + encoding: 'utf-8', + }) + + return this.importFromReadable(stream, options) + } } diff --git a/test/unit/import-events.spec.ts b/test/unit/import-events.spec.ts new file mode 100644 index 00000000..00cf5779 --- /dev/null +++ b/test/unit/import-events.spec.ts @@ -0,0 +1,75 @@ +import { expect } from 'chai' + +import { parseCliArgs } from '../../src/import-events' + +describe('parseCliArgs (import-events)', () => { + it('parses a basic file argument with default batch size', () => { + const result = parseCliArgs(['./events.jsonl']) + + expect(result).to.deep.equal({ + batchSize: 1000, + filePath: './events.jsonl', + showHelp: false, + }) + }) + + it('parses --batch-size with spaced value', () => { + const result = parseCliArgs(['./events.jsonl', '--batch-size', '500']) + + expect(result).to.deep.equal({ + batchSize: 500, + filePath: './events.jsonl', + showHelp: false, + }) + }) + + it('parses --batch-size with inline value', () => { + const result = parseCliArgs(['./events.jsonl', '--batch-size=250']) + + expect(result).to.deep.equal({ + batchSize: 250, + filePath: './events.jsonl', + showHelp: false, + }) + }) + + it('returns help mode when --help is present', () => { + const result = parseCliArgs(['--help']) + + expect(result).to.deep.equal({ + batchSize: 1000, + filePath: '', + showHelp: true, + }) + }) + + it('throws when input file path is missing', () => { + expect(() => parseCliArgs([])).to.throw('Missing input file path') + }) + + it('throws on unknown options including short options', () => { + expect(() => parseCliArgs(['./events.jsonl', '--unknown'])) + .to.throw('Unknown option: --unknown') + + expect(() => parseCliArgs(['./events.jsonl', '-z'])) + .to.throw('Unknown option: -z') + }) + + it('throws when --batch-size value is missing', () => { + expect(() => parseCliArgs(['./events.jsonl', '--batch-size'])) + .to.throw('Missing value for --batch-size') + }) + + it('throws when --batch-size is invalid', () => { + expect(() => parseCliArgs(['./events.jsonl', '--batch-size=0'])) + .to.throw('Invalid --batch-size value: 0') + + expect(() => parseCliArgs(['./events.jsonl', '--batch-size=abc'])) + .to.throw('Invalid --batch-size value: abc') + }) + + it('throws when extra positional arguments are provided', () => { + expect(() => parseCliArgs(['./events.jsonl', './more.jsonl'])) + .to.throw('Unexpected extra argument: ./more.jsonl') + }) +}) diff --git a/test/unit/services/event-import-service.spec.ts b/test/unit/services/event-import-service.spec.ts index 2f67eded..75e2d89a 100644 --- a/test/unit/services/event-import-service.spec.ts +++ b/test/unit/services/event-import-service.spec.ts @@ -2,6 +2,7 @@ import { join } from 'path' import fs from 'fs' import os from 'os' +import { Readable } from 'stream' import { EventImportLineError, EventImportService, EventImportStats } from '../../../src/services/event-import-service' import { Event } from '../../../src/@types/event' @@ -80,6 +81,26 @@ describe('EventImportService', () => { expect(finalProgress).to.deep.equal(stats) }) + it('imports valid events from a readable stream', async () => { + const [event] = getEvents() + + const persistBatch = async (events: Event[]): Promise => { + return events.length + } + + const importer = new EventImportService(persistBatch) + const input = Readable.from([`${JSON.stringify(event)}\n`]) + + const stats = await importer.importFromReadable(input) + + expect(stats).to.deep.equal({ + errors: 0, + inserted: 1, + processed: 1, + skipped: 0, + }) + }) + it('counts malformed and invalid events as errors and keeps importing', async () => { const [event] = getEvents() From e5435a84fc90f51594f6c2e13c83858ac9a08976 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 03/12] feat(export): add compression flags and stream metrics --- src/scripts/export-events.ts | 220 +++++++++++++++++++++++- test/unit/scripts/export-events.spec.ts | 136 +++++++++++++++ 2 files changed, 347 insertions(+), 9 deletions(-) create mode 100644 test/unit/scripts/export-events.spec.ts diff --git a/src/scripts/export-events.ts b/src/scripts/export-events.ts index af119cb5..56d4752d 100644 --- a/src/scripts/export-events.ts +++ b/src/scripts/export-events.ts @@ -5,8 +5,178 @@ import path from 'path' import { pipeline } from 'stream/promises' import { Transform } from 'stream' +import { + CompressionFormat, + createCompressionStream, + getCompressionFormatFromExtension, + parseCompressionFormat, +} from '../utils/compression' import { getMasterDbClient } from '../database/client' +type ExportCliOptions = { + compress: boolean + format?: CompressionFormat + outputFilePath: string + showHelp: boolean +} + +const DEFAULT_OUTPUT_FILE_PATH = 'events.jsonl' +const MIN_ELAPSED_SECONDS = 0.001 + +export const formatBytes = (bytes: number): string => { + const units = ['B', 'KiB', 'MiB', 'GiB', 'TiB'] + + if (!Number.isFinite(bytes) || bytes <= 0) { + return '0 B' + } + + let unitIndex = 0 + let value = bytes + + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024 + unitIndex += 1 + } + + const rounded = Math.round(value * 100) / 100 + const formatted = String(rounded) + + return `${formatted} ${units[unitIndex]}` +} + +export const formatCompressionDelta = (rawBytes: number, outputBytes: number): string | undefined => { + if (rawBytes <= 0) { + return undefined + } + + const deltaPercent = ((rawBytes - outputBytes) / rawBytes) * 100 + const rounded = Math.round(Math.abs(deltaPercent) * 100) / 100 + const formattedPercent = String(rounded) + + if (deltaPercent >= 0) { + return `${formattedPercent}% smaller` + } + + return `${formattedPercent}% larger` +} + +export const getRatePerSecond = (value: number, elapsedMs: number): number => { + if (!Number.isFinite(value) || value <= 0) { + return 0 + } + + const elapsedSeconds = Math.max(elapsedMs / 1000, MIN_ELAPSED_SECONDS) + + return value / elapsedSeconds +} + +const formatCount = (value: number): string => { + const rounded = Math.round(value * 100) / 100 + + return Number.isInteger(rounded) + ? rounded.toLocaleString('en-US') + : rounded.toLocaleString('en-US', { + maximumFractionDigits: 2, + minimumFractionDigits: 2, + }) +} + +const getOptionValue = (option: string, args: string[], index: number): [string, number] => { + const inlineSeparator = `${option}=` + if (args[index].startsWith(inlineSeparator)) { + const value = args[index].slice(inlineSeparator.length) + if (!value) { + throw new Error(`Missing value for ${option}`) + } + + return [value, index] + } + + const nextIndex = index + 1 + const nextArg = args[nextIndex] + if (typeof nextArg !== 'string' || nextArg.startsWith('-')) { + throw new Error(`Missing value for ${option}`) + } + + return [nextArg, nextIndex] +} + +const printUsage = (): void => { + console.log('Usage: npm run export -- [output-file] [--compress|-z] [--format gzip|gz|xz]') + console.log('Example: npm run export -- ./events.jsonl') + console.log('Example: npm run export -- ./events.jsonl.gz --compress --format gzip') + console.log('Example: npm run export -- ./events.jsonl.xz -z --format xz') +} + +const getCompressionLabel = (format: CompressionFormat): string => { + switch (format) { + case CompressionFormat.GZIP: + return 'gzip' + case CompressionFormat.XZ: + return 'xz' + default: + return String(format) + } +} + +export const parseCliArgs = (args: string[]): ExportCliOptions => { + let compress = false + let format: CompressionFormat | undefined + let outputFilePath: string | undefined + + if (args.includes('--help') || args.includes('-h')) { + return { + compress, + format, + outputFilePath: DEFAULT_OUTPUT_FILE_PATH, + showHelp: true, + } + } + + for (let index = 0; index < args.length; index++) { + const arg = args[index] + + if (arg === '--compress' || arg === '-z') { + compress = true + continue + } + + if (arg === '--format' || arg.startsWith('--format=')) { + const [rawFormat, nextIndex] = getOptionValue('--format', args, index) + format = parseCompressionFormat(rawFormat) + index = nextIndex + continue + } + + if (arg.startsWith('-')) { + throw new Error(`Unknown option: ${arg}`) + } + + if (outputFilePath) { + throw new Error(`Unexpected extra argument: ${arg}`) + } + + outputFilePath = arg + } + + if (!compress && format) { + throw new Error('--format requires --compress') + } + + outputFilePath = outputFilePath ?? DEFAULT_OUTPUT_FILE_PATH + + if (compress && !format) { + format = getCompressionFormatFromExtension(outputFilePath) ?? CompressionFormat.GZIP + } + + return { + compress, + format, + outputFilePath, + showHelp: false, + } +} + type EventRow = { event_id: Buffer event_pubkey: Buffer @@ -18,8 +188,13 @@ type EventRow = { } async function exportEvents(): Promise { - const filename = process.argv[2] || 'events.jsonl' - const outputPath = path.resolve(filename) + const options = parseCliArgs(process.argv.slice(2)) + if (options.showHelp) { + printUsage() + return + } + + const outputPath = path.resolve(options.outputFilePath) const db = getMasterDbClient() const abortController = new AbortController() let interruptedBySignal: NodeJS.Signals | undefined @@ -49,10 +224,17 @@ async function exportEvents(): Promise { return } - console.log(`Exporting events to ${outputPath}`) + if (options.format) { + console.log(`Exporting events to ${outputPath} using ${getCompressionLabel(options.format)} compression`) + } else { + console.log(`Exporting events to ${outputPath}`) + } + const startedAt = Date.now() const output = fs.createWriteStream(outputPath) + const compressionStream = createCompressionStream(options.format) let exported = 0 + let rawBytes = 0 const dbStream = db('events') .select( @@ -87,15 +269,33 @@ async function exportEvents(): Promise { console.log(`Exported ${exported} events...`) } - callback(null, JSON.stringify(event) + '\n') + const line = JSON.stringify(event) + '\n' + rawBytes += Buffer.byteLength(line) + callback(null, line) }, }) - await pipeline(dbStream, toJsonLine, output, { + await pipeline(dbStream, toJsonLine, compressionStream, output, { signal: abortController.signal, }) + const elapsedMs = Date.now() - startedAt + const outputBytes = output.bytesWritten + const compressionDelta = formatCompressionDelta(rawBytes, outputBytes) + const eventRate = getRatePerSecond(exported, elapsedMs) + const rawRate = getRatePerSecond(rawBytes, elapsedMs) + const outputRate = getRatePerSecond(outputBytes, elapsedMs) + console.log(`Export complete: ${exported} events written to ${outputPath}`) + if (compressionDelta) { + console.log(`Size: ${formatBytes(rawBytes)} raw -> ${formatBytes(outputBytes)} on disk (${compressionDelta})`) + } else { + console.log(`Size: ${formatBytes(outputBytes)} on disk`) + } + + console.log( + `Throughput: ${formatCount(eventRate)} events/s | ${formatBytes(rawRate)}/s raw | ${formatBytes(outputRate)}/s output`, + ) } catch (error) { if (abortController.signal.aborted) { console.log(`Export interrupted by ${interruptedBySignal ?? 'signal'}.`) @@ -111,7 +311,9 @@ async function exportEvents(): Promise { } } -exportEvents().catch((error) => { - console.error('Export failed:', error.message) - process.exit(1) -}) +if (require.main === module) { + exportEvents().catch((error) => { + console.error('Export failed:', error.message) + process.exit(1) + }) +} diff --git a/test/unit/scripts/export-events.spec.ts b/test/unit/scripts/export-events.spec.ts new file mode 100644 index 00000000..e14121c1 --- /dev/null +++ b/test/unit/scripts/export-events.spec.ts @@ -0,0 +1,136 @@ +import { expect } from 'chai' + +import { CompressionFormat } from '../../../src/utils/compression' +import { + formatBytes, + formatCompressionDelta, + getRatePerSecond, + parseCliArgs, +} from '../../../src/scripts/export-events' + +describe('parseCliArgs (export-events)', () => { + it('uses defaults when no args are provided', () => { + const result = parseCliArgs([]) + + expect(result).to.deep.equal({ + compress: false, + format: undefined, + outputFilePath: 'events.jsonl', + showHelp: false, + }) + }) + + it('returns help mode when --help is present', () => { + const result = parseCliArgs(['--help']) + + expect(result).to.deep.equal({ + compress: false, + format: undefined, + outputFilePath: 'events.jsonl', + showHelp: true, + }) + }) + + it('parses output file without compression', () => { + const result = parseCliArgs(['backup.jsonl']) + + expect(result).to.deep.equal({ + compress: false, + format: undefined, + outputFilePath: 'backup.jsonl', + showHelp: false, + }) + }) + + it('parses compression flags and explicit format', () => { + const result = parseCliArgs(['backup.jsonl.gz', '--compress', '--format', 'gz']) + + expect(result).to.deep.equal({ + compress: true, + format: CompressionFormat.GZIP, + outputFilePath: 'backup.jsonl.gz', + showHelp: false, + }) + }) + + it('parses inline --format value', () => { + const result = parseCliArgs(['backup.jsonl.xz', '-z', '--format=xz']) + + expect(result).to.deep.equal({ + compress: true, + format: CompressionFormat.XZ, + outputFilePath: 'backup.jsonl.xz', + showHelp: false, + }) + }) + + it('infers format from extension when compression is enabled', () => { + const gzipResult = parseCliArgs(['backup.jsonl.gz', '--compress']) + const xzResult = parseCliArgs(['backup.jsonl.xz', '-z']) + + expect(gzipResult.format).to.equal(CompressionFormat.GZIP) + expect(xzResult.format).to.equal(CompressionFormat.XZ) + }) + + it('defaults to gzip when compression is enabled and extension is unknown', () => { + const result = parseCliArgs(['backup.data', '--compress']) + + expect(result).to.deep.equal({ + compress: true, + format: CompressionFormat.GZIP, + outputFilePath: 'backup.data', + showHelp: false, + }) + }) + + it('throws when --format is provided without --compress', () => { + expect(() => parseCliArgs(['backup.jsonl', '--format=gzip'])) + .to.throw('--format requires --compress') + }) + + it('throws when --format is missing a value', () => { + expect(() => parseCliArgs(['--compress', '--format'])) + .to.throw('Missing value for --format') + + expect(() => parseCliArgs(['--compress', '--format='])) + .to.throw('Missing value for --format') + }) + + it('throws on unknown options', () => { + expect(() => parseCliArgs(['--unknown'])) + .to.throw('Unknown option: --unknown') + }) + + it('throws on unexpected extra positional arguments', () => { + expect(() => parseCliArgs(['backup.jsonl', 'extra.jsonl'])) + .to.throw('Unexpected extra argument: extra.jsonl') + }) +}) + +describe('export metrics helpers', () => { + it('formats bytes using binary units', () => { + expect(formatBytes(0)).to.equal('0 B') + expect(formatBytes(1023)).to.equal('1023 B') + expect(formatBytes(1024)).to.equal('1 KiB') + expect(formatBytes(1536)).to.equal('1.5 KiB') + expect(formatBytes(1048576)).to.equal('1 MiB') + }) + + it('formats compression delta for smaller output', () => { + expect(formatCompressionDelta(1000, 250)).to.equal('75% smaller') + }) + + it('formats compression delta for larger output', () => { + expect(formatCompressionDelta(1000, 1200)).to.equal('20% larger') + }) + + it('returns undefined compression delta when raw bytes are zero', () => { + expect(formatCompressionDelta(0, 100)).to.equal(undefined) + }) + + it('calculates per-second rates safely', () => { + expect(getRatePerSecond(400, 2000)).to.equal(200) + expect(getRatePerSecond(0, 2000)).to.equal(0) + expect(getRatePerSecond(1000, 0)).to.equal(1000000) + }) +}) From 311bbe3156e25a70616db155ae84613c3e363540 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 04/12] docs: document compressed import/export usage --- README.md | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5ddb1b6d..2406b645 100644 --- a/README.md +++ b/README.md @@ -246,13 +246,23 @@ Print the I2P hostname: ### Importing events from JSON Lines -You can import NIP-01 events from a `.jsonl` file directly into the relay database. +You can import NIP-01 events from `.jsonl` files directly into the relay database. +Compressed files are also supported and decompressed on-the-fly: + +- `.jsonl.gz` (Gzip) +- `.jsonl.xz` (XZ) Basic import: ``` npm run import -- ./events.jsonl ``` +Import a compressed backup: + ``` + npm run import -- ./events.jsonl.gz + npm run import -- ./events.jsonl.xz + ``` + Set a custom batch size (default: `1000`): ``` npm run import -- ./events.jsonl --batch-size 500 @@ -636,16 +646,42 @@ To observe client and subscription counts in real-time during a test, you can in ```bash docker compose logs -f nostream ``` - ## Export Events Export all stored events to a [JSON Lines](https://jsonlines.org/) (`.jsonl`) file. Each line is a valid NIP-01 Nostr event JSON object. The export streams rows from the database using cursors, so it works safely on relays with millions of events without loading them into memory. +Optional compression is supported for lower storage and transfer costs: + +- Gzip via Node's native `zlib` +- XZ via `lzma-native` + ``` npm run export # writes to events.jsonl npm run export -- backup-2024-01-01.jsonl # custom filename +npm run export -- backup.jsonl.gz --compress --format=gzip +npm run export -- backup.jsonl.xz --compress --format=xz ``` +Flags: + +- `--compress` / `-z`: enable compression. +- `--format `: compression format. If omitted while compression is enabled, + format is inferred from file extension (`.gz` / `.xz`) and defaults to `gzip`. + +After completion, the exporter prints a summary with: + +- Raw bytes generated from JSONL lines +- Output bytes written to disk +- Compression delta (smaller/larger) +- Throughput in events/sec and bytes/sec + +Optional XZ tuning (environment variables): + +- `NOSTREAM_XZ_THREADS`: max worker threads for XZ compression. + Defaults to `4` and is automatically capped to available CPU cores minus one. +- `NOSTREAM_XZ_PRESET`: compression preset from `0` (fastest, larger output) + to `9` (slowest, smallest output). Default is `6`. + The script reads the same `DB_*` environment variables used by the relay (see [CONFIGURATION.md](CONFIGURATION.md)). ## Relay Maintenance From e7767a70d47b89b1b6e9328207dc0116a8b4f3a2 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 05/12] test(integration): add compression roundtrip scenario --- .../compression/compression-roundtrip.feature | 13 + .../compression-roundtrip.feature.ts | 229 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 test/integration/features/compression/compression-roundtrip.feature create mode 100644 test/integration/features/compression/compression-roundtrip.feature.ts diff --git a/test/integration/features/compression/compression-roundtrip.feature b/test/integration/features/compression/compression-roundtrip.feature new file mode 100644 index 00000000..fe4eadb0 --- /dev/null +++ b/test/integration/features/compression/compression-roundtrip.feature @@ -0,0 +1,13 @@ +@compression-roundtrip +Feature: Compressed import/export roundtrip + Scenario Outline: roundtrip events with compression + Given a seeded compression roundtrip dataset + When I export events using "" compression + And I remove the seeded roundtrip events from the database + And I import the compressed roundtrip file + Then the seeded roundtrip events are restored + + Examples: + | format | + | gzip | + | xz | diff --git a/test/integration/features/compression/compression-roundtrip.feature.ts b/test/integration/features/compression/compression-roundtrip.feature.ts new file mode 100644 index 00000000..ffbd2d89 --- /dev/null +++ b/test/integration/features/compression/compression-roundtrip.feature.ts @@ -0,0 +1,229 @@ +import { After, Given, Then, When, World } from '@cucumber/cucumber' +import { randomUUID } from 'crypto' +import { spawn } from 'child_process' +import { expect } from 'chai' +import fs from 'fs' +import os from 'os' +import { join } from 'path' + +import { getMasterDbClient } from '../../../../src/database/client' +import { EventRepository } from '../../../../src/repositories/event-repository' +import { createEvent, createIdentity } from '../helpers' + +type ScriptResult = { + exitCode: number + stderr: string + stdout: string +} + +type CompressionRoundtripState = { + expectedContents: string[] + expectedIds: string[] + identityName: string + outputFilePath: string + pubkey: string + tempDir: string +} + +const SCRIPT_TIMEOUT_MS = 60_000 +const ROUNDTRIP_KEY = 'compressionRoundtrip' + +const runCliScript = async ( + scriptPath: string, + args: string[], +): Promise => { + return new Promise((resolve, reject) => { + const commandArgs = ['--env-file-if-exists=.env', '-r', 'ts-node/register', scriptPath, ...args] + const child = spawn(process.execPath, commandArgs, { + cwd: process.cwd(), + env: process.env, + stdio: ['ignore', 'pipe', 'pipe'], + }) + + let stdout = '' + let stderr = '' + + const timeout = setTimeout(() => { + child.kill('SIGKILL') + reject(new Error(`Timed out while running ${scriptPath}`)) + }, SCRIPT_TIMEOUT_MS) + + child.stdout.on('data', (chunk: Buffer | string) => { + stdout += chunk.toString() + }) + + child.stderr.on('data', (chunk: Buffer | string) => { + stderr += chunk.toString() + }) + + child.on('error', (error) => { + clearTimeout(timeout) + reject(error) + }) + + child.on('close', (code) => { + clearTimeout(timeout) + resolve({ + exitCode: code ?? 1, + stderr, + stdout, + }) + }) + }) +} + +const assertCommandSuccess = ( + result: ScriptResult, + label: string, +): void => { + if (result.exitCode === 0) { + return + } + + throw new Error( + `${label} failed with exit code ${result.exitCode}\nstdout:\n${result.stdout}\nstderr:\n${result.stderr}`, + ) +} + +const getRoundtripState = ( + world: World>, +): CompressionRoundtripState => { + const state = world.parameters[ROUNDTRIP_KEY] + + if (!state) { + throw new Error('Compression roundtrip state is not initialized') + } + + return state as CompressionRoundtripState +} + +Given('a seeded compression roundtrip dataset', async function (this: World>) { + const dbClient = getMasterDbClient() + const repository = new EventRepository(dbClient, dbClient) + + const identityName = `RoundtripUser${Date.now()}` + const identity = createIdentity(identityName) + + const identities = this.parameters.identities as Record + identities[identityName] = identity + + const token = randomUUID() + const firstContent = `compression-roundtrip-${token}-1` + const secondContent = `compression-roundtrip-${token}-2` + + const firstEvent = await createEvent( + { + content: firstContent, + kind: 1, + pubkey: identity.pubkey, + tags: [['t', 'compression-roundtrip']], + }, + identity.privkey, + ) + + const secondEvent = await createEvent( + { + content: secondContent, + kind: 1, + pubkey: identity.pubkey, + tags: [['t', 'compression-roundtrip']], + }, + identity.privkey, + ) + + const inserted = await repository.createMany([firstEvent, secondEvent]) + expect(inserted).to.equal(2) + + const tempDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-compression-roundtrip-')) + + this.parameters[ROUNDTRIP_KEY] = { + expectedContents: [firstContent, secondContent].sort(), + expectedIds: [firstEvent.id, secondEvent.id].sort(), + identityName, + outputFilePath: '', + pubkey: identity.pubkey, + tempDir, + } as CompressionRoundtripState +}) + +When( + 'I export events using {string} compression', + async function (this: World>, format: string) { + if (format !== 'gzip' && format !== 'xz') { + throw new Error(`Unsupported test format: ${format}`) + } + + const state = getRoundtripState(this) + const extension = format === 'gzip' ? '.jsonl.gz' : '.jsonl.xz' + const outputFilePath = join(state.tempDir, `events${extension}`) + + const result = await runCliScript('src/scripts/export-events.ts', [ + outputFilePath, + '--compress', + '--format', + format, + ]) + + assertCommandSuccess(result, 'export script') + + expect(fs.existsSync(outputFilePath)).to.equal(true) + expect(fs.statSync(outputFilePath).size).to.be.greaterThan(0) + + state.outputFilePath = outputFilePath + this.parameters[ROUNDTRIP_KEY] = state + }, +) + +When('I remove the seeded roundtrip events from the database', async function (this: World>) { + const state = getRoundtripState(this) + const dbClient = getMasterDbClient() + + await dbClient('events') + .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) + .delete() +}) + +When('I import the compressed roundtrip file', async function (this: World>) { + const state = getRoundtripState(this) + + const result = await runCliScript('src/import-events.ts', [ + state.outputFilePath, + '--batch-size', + '2', + ]) + + assertCommandSuccess(result, 'import script') +}) + +Then('the seeded roundtrip events are restored', async function (this: World>) { + const state = getRoundtripState(this) + const dbClient = getMasterDbClient() + + const rows = await dbClient('events') + .select('event_id', 'event_content') + .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) + + const actualIds = rows + .map((row: { event_id: Buffer }) => row.event_id.toString('hex')) + .sort() + + const actualContents = rows + .map((row: { event_content: string }) => row.event_content) + .sort() + + expect(actualIds).to.deep.equal(state.expectedIds) + expect(actualContents).to.deep.equal(state.expectedContents) +}) + +After({ tags: '@compression-roundtrip' }, async function (this: World>) { + const state = this.parameters[ROUNDTRIP_KEY] as CompressionRoundtripState | undefined + + if (state?.tempDir) { + fs.rmSync(state.tempDir, { + force: true, + recursive: true, + }) + } + + this.parameters[ROUNDTRIP_KEY] = undefined +}) From f048f9f9a495c159ac69f0ec92e29cc19fec79bc Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 03:06:39 +0530 Subject: [PATCH 06/12] ci: add changeset and knip ignore for lzma-native --- .changeset/quick-maps-marry.md | 9 +++++++++ .knip.json | 3 +++ 2 files changed, 12 insertions(+) create mode 100644 .changeset/quick-maps-marry.md diff --git a/.changeset/quick-maps-marry.md b/.changeset/quick-maps-marry.md new file mode 100644 index 00000000..75e563f5 --- /dev/null +++ b/.changeset/quick-maps-marry.md @@ -0,0 +1,9 @@ +--- +"nostream": minor +--- + +Add gzip and xz compression support to event import/export flows. + +- Export supports `--compress`/`-z` with `--format gzip|gz|xz`. +- Import auto-detects compressed input by extension and magic bytes and decompresses in a stream pipeline. +- Includes docs updates and unit/integration test coverage for compression paths. \ No newline at end of file diff --git a/.knip.json b/.knip.json index 36d25b8c..193d47bc 100644 --- a/.knip.json +++ b/.knip.json @@ -8,6 +8,9 @@ "project": [ "src/**/*.ts" ], + "ignoreDependencies": [ + "lzma-native" + ], "ignoreFiles": [], "commitlint": false, "eslint": false, From 38f03c17ba42010adef440e5d155ec4444ea74f3 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 16:33:17 +0530 Subject: [PATCH 07/12] fix(import): propagate decompression stream errors --- src/import-events.ts | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/import-events.ts b/src/import-events.ts index 83ef6d5f..ea98703d 100644 --- a/src/import-events.ts +++ b/src/import-events.ts @@ -138,9 +138,26 @@ const createImportStream = async (absoluteFilePath: string) => { } } + const decompressor = createDecompressionStream(compressionFormat) + + source.on('error', (error) => { + if (!decompressor.destroyed) { + decompressor.destroy(error) + } + }) + + const closeSource = () => { + if (!source.destroyed) { + source.destroy() + } + } + + decompressor.on('close', closeSource) + decompressor.on('error', closeSource) + return { compressionFormat, - stream: source.pipe(createDecompressionStream(compressionFormat)), + stream: source.pipe(decompressor), } } From f9e38f125402a33df1dabac3622c333345cb7b5a Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 16:38:52 +0530 Subject: [PATCH 08/12] fix(deps): make lzma-native optional --- package-lock.json | 13 +++++++++++-- package.json | 4 +++- test/unit/utils/compression.spec.ts | 21 ++++++++++++++++++--- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index 1da670dd..fbcdcdc4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,6 @@ "express": "4.22.1", "js-yaml": "4.1.1", "knex": "2.4.2", - "lzma-native": "^8.0.6", "pg": "8.9.0", "pg-query-stream": "4.3.0", "ramda": "0.28.0", @@ -62,6 +61,9 @@ }, "engines": { "node": ">=24.14.1" + }, + "optionalDependencies": { + "lzma-native": "^8.0.6" } }, "node_modules/@babel/code-frame": { @@ -6316,6 +6318,7 @@ "resolved": "https://registry.npmjs.org/lzma-native/-/lzma-native-8.0.6.tgz", "integrity": "sha512-09xfg67mkL2Lz20PrrDeNYZxzeW7ADtpYFbwSQh9U8+76RIzx5QsJBMy8qikv3hbUPfpy6hqwxt6FcGK81g9AA==", "hasInstallScript": true, + "optional": true, "dependencies": { "node-addon-api": "^3.1.0", "node-gyp-build": "^4.2.1", @@ -6332,6 +6335,7 @@ "version": "3.6.2", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "optional": true, "dependencies": { "inherits": "^2.0.3", "string_decoder": "^1.1.1", @@ -6967,12 +6971,14 @@ "node_modules/node-addon-api": { "version": "3.2.1", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.2.1.tgz", - "integrity": "sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A==" + "integrity": "sha512-mmcei9JghVNDYydghQmeDX8KoAm0FAiYyIcUt/N4nhyAipB17pllZQDOJD2fotxABnt4Mdz+dKTO7eftLg4d0A==", + "optional": true }, "node_modules/node-gyp-build": { "version": "4.8.4", "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.8.4.tgz", "integrity": "sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==", + "optional": true, "bin": { "node-gyp-build": "bin.js", "node-gyp-build-optional": "optional.js", @@ -9302,6 +9308,7 @@ "version": "1.1.1", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "devOptional": true, "license": "MIT", "dependencies": { "safe-buffer": "~5.1.0" @@ -9311,6 +9318,7 @@ "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", + "devOptional": true, "license": "MIT" }, "node_modules/string-argv": { @@ -10162,6 +10170,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", + "devOptional": true, "license": "MIT" }, "node_modules/utils-merge": { diff --git a/package.json b/package.json index f940432c..dc536f1b 100644 --- a/package.json +++ b/package.json @@ -131,7 +131,6 @@ "express": "4.22.1", "js-yaml": "4.1.1", "knex": "2.4.2", - "lzma-native": "^8.0.6", "pg": "8.9.0", "pg-query-stream": "4.3.0", "ramda": "0.28.0", @@ -139,6 +138,9 @@ "ws": "^8.18.0", "zod": "^3.22.4" }, + "optionalDependencies": { + "lzma-native": "^8.0.6" + }, "overrides": { "axios@<0.31.0": ">=0.31.0" } diff --git a/test/unit/utils/compression.spec.ts b/test/unit/utils/compression.spec.ts index a2ff4ec7..e2cc0b00 100644 --- a/test/unit/utils/compression.spec.ts +++ b/test/unit/utils/compression.spec.ts @@ -17,6 +17,16 @@ import { parseCompressionFormat, } from '../../../src/utils/compression' +const hasLzmaNative = (): boolean => { + try { + require.resolve('lzma-native') + + return true + } catch { + return false + } +} + const toBuffer = async (stream: NodeJS.ReadableStream): Promise => { const chunks: Buffer[] = [] @@ -37,6 +47,9 @@ const expectStreamToFail = async (stream: NodeJS.ReadableStream): Promise } describe('compression utils', () => { + const xzAvailable = hasLzmaNative() + const itIfXzAvailable = xzAvailable ? it : it.skip + const tempDirs: string[] = [] const createTempFile = (name: string, data: Buffer): string => { @@ -169,7 +182,7 @@ describe('compression utils', () => { expect(decompressed.equals(input)).to.equal(true) }) - it('round-trips data with xz streams', async () => { + itIfXzAvailable('round-trips data with xz streams', async () => { const input = Buffer.from('nostream-xz-test\n'.repeat(2048), 'utf-8') const compressed = await toBuffer( @@ -244,7 +257,7 @@ describe('compression utils', () => { ) }) - it('fails to decompress invalid xz payloads', async () => { + itIfXzAvailable('fails to decompress invalid xz payloads', async () => { const invalidPayload = Buffer.from('not-an-xz-stream', 'utf-8') await expectStreamToFail( @@ -254,7 +267,9 @@ describe('compression utils', () => { it('round-trips binary payloads across boundary sizes', async () => { const sizes = [0, 1, 2, 31, 32, 33, 1024, 8192] - const formats = [CompressionFormat.GZIP, CompressionFormat.XZ] + const formats = xzAvailable + ? [CompressionFormat.GZIP, CompressionFormat.XZ] + : [CompressionFormat.GZIP] for (const size of sizes) { const input = Buffer.alloc(size) From a053948d7f5f8387720e1c8822c3e3f725378227 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 16:42:27 +0530 Subject: [PATCH 09/12] test: remove compression integration roundtrip scenario --- .../compression/compression-roundtrip.feature | 13 - .../compression-roundtrip.feature.ts | 229 ------------------ 2 files changed, 242 deletions(-) delete mode 100644 test/integration/features/compression/compression-roundtrip.feature delete mode 100644 test/integration/features/compression/compression-roundtrip.feature.ts diff --git a/test/integration/features/compression/compression-roundtrip.feature b/test/integration/features/compression/compression-roundtrip.feature deleted file mode 100644 index fe4eadb0..00000000 --- a/test/integration/features/compression/compression-roundtrip.feature +++ /dev/null @@ -1,13 +0,0 @@ -@compression-roundtrip -Feature: Compressed import/export roundtrip - Scenario Outline: roundtrip events with compression - Given a seeded compression roundtrip dataset - When I export events using "" compression - And I remove the seeded roundtrip events from the database - And I import the compressed roundtrip file - Then the seeded roundtrip events are restored - - Examples: - | format | - | gzip | - | xz | diff --git a/test/integration/features/compression/compression-roundtrip.feature.ts b/test/integration/features/compression/compression-roundtrip.feature.ts deleted file mode 100644 index ffbd2d89..00000000 --- a/test/integration/features/compression/compression-roundtrip.feature.ts +++ /dev/null @@ -1,229 +0,0 @@ -import { After, Given, Then, When, World } from '@cucumber/cucumber' -import { randomUUID } from 'crypto' -import { spawn } from 'child_process' -import { expect } from 'chai' -import fs from 'fs' -import os from 'os' -import { join } from 'path' - -import { getMasterDbClient } from '../../../../src/database/client' -import { EventRepository } from '../../../../src/repositories/event-repository' -import { createEvent, createIdentity } from '../helpers' - -type ScriptResult = { - exitCode: number - stderr: string - stdout: string -} - -type CompressionRoundtripState = { - expectedContents: string[] - expectedIds: string[] - identityName: string - outputFilePath: string - pubkey: string - tempDir: string -} - -const SCRIPT_TIMEOUT_MS = 60_000 -const ROUNDTRIP_KEY = 'compressionRoundtrip' - -const runCliScript = async ( - scriptPath: string, - args: string[], -): Promise => { - return new Promise((resolve, reject) => { - const commandArgs = ['--env-file-if-exists=.env', '-r', 'ts-node/register', scriptPath, ...args] - const child = spawn(process.execPath, commandArgs, { - cwd: process.cwd(), - env: process.env, - stdio: ['ignore', 'pipe', 'pipe'], - }) - - let stdout = '' - let stderr = '' - - const timeout = setTimeout(() => { - child.kill('SIGKILL') - reject(new Error(`Timed out while running ${scriptPath}`)) - }, SCRIPT_TIMEOUT_MS) - - child.stdout.on('data', (chunk: Buffer | string) => { - stdout += chunk.toString() - }) - - child.stderr.on('data', (chunk: Buffer | string) => { - stderr += chunk.toString() - }) - - child.on('error', (error) => { - clearTimeout(timeout) - reject(error) - }) - - child.on('close', (code) => { - clearTimeout(timeout) - resolve({ - exitCode: code ?? 1, - stderr, - stdout, - }) - }) - }) -} - -const assertCommandSuccess = ( - result: ScriptResult, - label: string, -): void => { - if (result.exitCode === 0) { - return - } - - throw new Error( - `${label} failed with exit code ${result.exitCode}\nstdout:\n${result.stdout}\nstderr:\n${result.stderr}`, - ) -} - -const getRoundtripState = ( - world: World>, -): CompressionRoundtripState => { - const state = world.parameters[ROUNDTRIP_KEY] - - if (!state) { - throw new Error('Compression roundtrip state is not initialized') - } - - return state as CompressionRoundtripState -} - -Given('a seeded compression roundtrip dataset', async function (this: World>) { - const dbClient = getMasterDbClient() - const repository = new EventRepository(dbClient, dbClient) - - const identityName = `RoundtripUser${Date.now()}` - const identity = createIdentity(identityName) - - const identities = this.parameters.identities as Record - identities[identityName] = identity - - const token = randomUUID() - const firstContent = `compression-roundtrip-${token}-1` - const secondContent = `compression-roundtrip-${token}-2` - - const firstEvent = await createEvent( - { - content: firstContent, - kind: 1, - pubkey: identity.pubkey, - tags: [['t', 'compression-roundtrip']], - }, - identity.privkey, - ) - - const secondEvent = await createEvent( - { - content: secondContent, - kind: 1, - pubkey: identity.pubkey, - tags: [['t', 'compression-roundtrip']], - }, - identity.privkey, - ) - - const inserted = await repository.createMany([firstEvent, secondEvent]) - expect(inserted).to.equal(2) - - const tempDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-compression-roundtrip-')) - - this.parameters[ROUNDTRIP_KEY] = { - expectedContents: [firstContent, secondContent].sort(), - expectedIds: [firstEvent.id, secondEvent.id].sort(), - identityName, - outputFilePath: '', - pubkey: identity.pubkey, - tempDir, - } as CompressionRoundtripState -}) - -When( - 'I export events using {string} compression', - async function (this: World>, format: string) { - if (format !== 'gzip' && format !== 'xz') { - throw new Error(`Unsupported test format: ${format}`) - } - - const state = getRoundtripState(this) - const extension = format === 'gzip' ? '.jsonl.gz' : '.jsonl.xz' - const outputFilePath = join(state.tempDir, `events${extension}`) - - const result = await runCliScript('src/scripts/export-events.ts', [ - outputFilePath, - '--compress', - '--format', - format, - ]) - - assertCommandSuccess(result, 'export script') - - expect(fs.existsSync(outputFilePath)).to.equal(true) - expect(fs.statSync(outputFilePath).size).to.be.greaterThan(0) - - state.outputFilePath = outputFilePath - this.parameters[ROUNDTRIP_KEY] = state - }, -) - -When('I remove the seeded roundtrip events from the database', async function (this: World>) { - const state = getRoundtripState(this) - const dbClient = getMasterDbClient() - - await dbClient('events') - .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) - .delete() -}) - -When('I import the compressed roundtrip file', async function (this: World>) { - const state = getRoundtripState(this) - - const result = await runCliScript('src/import-events.ts', [ - state.outputFilePath, - '--batch-size', - '2', - ]) - - assertCommandSuccess(result, 'import script') -}) - -Then('the seeded roundtrip events are restored', async function (this: World>) { - const state = getRoundtripState(this) - const dbClient = getMasterDbClient() - - const rows = await dbClient('events') - .select('event_id', 'event_content') - .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) - - const actualIds = rows - .map((row: { event_id: Buffer }) => row.event_id.toString('hex')) - .sort() - - const actualContents = rows - .map((row: { event_content: string }) => row.event_content) - .sort() - - expect(actualIds).to.deep.equal(state.expectedIds) - expect(actualContents).to.deep.equal(state.expectedContents) -}) - -After({ tags: '@compression-roundtrip' }, async function (this: World>) { - const state = this.parameters[ROUNDTRIP_KEY] as CompressionRoundtripState | undefined - - if (state?.tempDir) { - fs.rmSync(state.tempDir, { - force: true, - recursive: true, - }) - } - - this.parameters[ROUNDTRIP_KEY] = undefined -}) From 497b6169f7412cf2704f49ef06e9cff0bb295e12 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 23:08:10 +0530 Subject: [PATCH 10/12] fix(import): resolve deduplication merge conflict handling --- src/services/event-import-service.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts index 40d1d606..c7071ce9 100644 --- a/src/services/event-import-service.ts +++ b/src/services/event-import-service.ts @@ -31,10 +31,11 @@ const enrichEventMetadata = (event: Event): Event => { } if (isParameterizedReplaceableEvent(event)) { - const [, ...deduplication] = event.tags.find( - (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, - ) ?? [null, ''] - enriched = { ...enriched, [EventDeduplicationMetadataKey]: deduplication } + const [, deduplication] = event.tags.find((tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication) ?? [ + null, + '', + ] + enriched = { ...enriched, [EventDeduplicationMetadataKey]: deduplication ? [deduplication] : [''] } } return enriched From 866bc2e6fdbc88cbcd977c8b77e2ee7b76fdb498 Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Sun, 19 Apr 2026 23:21:06 +0530 Subject: [PATCH 11/12] test(integration): restore compression roundtrip scenario --- .../compression/compression-roundtrip.feature | 13 + .../compression-roundtrip.feature.ts | 229 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 test/integration/features/compression/compression-roundtrip.feature create mode 100644 test/integration/features/compression/compression-roundtrip.feature.ts diff --git a/test/integration/features/compression/compression-roundtrip.feature b/test/integration/features/compression/compression-roundtrip.feature new file mode 100644 index 00000000..8b01e2a0 --- /dev/null +++ b/test/integration/features/compression/compression-roundtrip.feature @@ -0,0 +1,13 @@ +@compression-roundtrip +Feature: Compressed import/export roundtrip + Scenario Outline: roundtrip events with compression + Given a seeded compression roundtrip dataset + When I export events using "" compression + And I remove the seeded roundtrip events from the database + And I import the compressed roundtrip file + Then the seeded roundtrip events are restored + + Examples: + | format | + | gzip | + | xz | \ No newline at end of file diff --git a/test/integration/features/compression/compression-roundtrip.feature.ts b/test/integration/features/compression/compression-roundtrip.feature.ts new file mode 100644 index 00000000..55d52856 --- /dev/null +++ b/test/integration/features/compression/compression-roundtrip.feature.ts @@ -0,0 +1,229 @@ +import { After, Given, Then, When, World } from '@cucumber/cucumber' +import { randomUUID } from 'crypto' +import { spawn } from 'child_process' +import { expect } from 'chai' +import fs from 'fs' +import os from 'os' +import { join } from 'path' + +import { getMasterDbClient } from '../../../../src/database/client' +import { EventRepository } from '../../../../src/repositories/event-repository' +import { createEvent, createIdentity } from '../helpers' + +type ScriptResult = { + exitCode: number + stderr: string + stdout: string +} + +type CompressionRoundtripState = { + expectedContents: string[] + expectedIds: string[] + identityName: string + outputFilePath: string + pubkey: string + tempDir: string +} + +const SCRIPT_TIMEOUT_MS = 60_000 +const ROUNDTRIP_KEY = 'compressionRoundtrip' + +const runCliScript = async ( + scriptPath: string, + args: string[], +): Promise => { + return new Promise((resolve, reject) => { + const commandArgs = ['--env-file-if-exists=.env', '-r', 'ts-node/register', scriptPath, ...args] + const child = spawn(process.execPath, commandArgs, { + cwd: process.cwd(), + env: process.env, + stdio: ['ignore', 'pipe', 'pipe'], + }) + + let stdout = '' + let stderr = '' + + const timeout = setTimeout(() => { + child.kill('SIGKILL') + reject(new Error(`Timed out while running ${scriptPath}`)) + }, SCRIPT_TIMEOUT_MS) + + child.stdout.on('data', (chunk: Buffer | string) => { + stdout += chunk.toString() + }) + + child.stderr.on('data', (chunk: Buffer | string) => { + stderr += chunk.toString() + }) + + child.on('error', (error) => { + clearTimeout(timeout) + reject(error) + }) + + child.on('close', (code) => { + clearTimeout(timeout) + resolve({ + exitCode: code ?? 1, + stderr, + stdout, + }) + }) + }) +} + +const assertCommandSuccess = ( + result: ScriptResult, + label: string, +): void => { + if (result.exitCode === 0) { + return + } + + throw new Error( + `${label} failed with exit code ${result.exitCode}\nstdout:\n${result.stdout}\nstderr:\n${result.stderr}`, + ) +} + +const getRoundtripState = ( + world: World>, +): CompressionRoundtripState => { + const state = world.parameters[ROUNDTRIP_KEY] + + if (!state) { + throw new Error('Compression roundtrip state is not initialized') + } + + return state as CompressionRoundtripState +} + +Given('a seeded compression roundtrip dataset', async function (this: World>) { + const dbClient = getMasterDbClient() + const repository = new EventRepository(dbClient, dbClient) + + const identityName = `RoundtripUser${Date.now()}` + const identity = createIdentity(identityName) + + const identities = this.parameters.identities as Record + identities[identityName] = identity + + const token = randomUUID() + const firstContent = `compression-roundtrip-${token}-1` + const secondContent = `compression-roundtrip-${token}-2` + + const firstEvent = await createEvent( + { + content: firstContent, + kind: 1, + pubkey: identity.pubkey, + tags: [['t', 'compression-roundtrip']], + }, + identity.privkey, + ) + + const secondEvent = await createEvent( + { + content: secondContent, + kind: 1, + pubkey: identity.pubkey, + tags: [['t', 'compression-roundtrip']], + }, + identity.privkey, + ) + + const inserted = await repository.createMany([firstEvent, secondEvent]) + expect(inserted).to.equal(2) + + const tempDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-compression-roundtrip-')) + + this.parameters[ROUNDTRIP_KEY] = { + expectedContents: [firstContent, secondContent].sort(), + expectedIds: [firstEvent.id, secondEvent.id].sort(), + identityName, + outputFilePath: '', + pubkey: identity.pubkey, + tempDir, + } as CompressionRoundtripState +}) + +When( + 'I export events using {string} compression', + async function (this: World>, format: string) { + if (format !== 'gzip' && format !== 'xz') { + throw new Error(`Unsupported test format: ${format}`) + } + + const state = getRoundtripState(this) + const extension = format === 'gzip' ? '.jsonl.gz' : '.jsonl.xz' + const outputFilePath = join(state.tempDir, `events${extension}`) + + const result = await runCliScript('src/scripts/export-events.ts', [ + outputFilePath, + '--compress', + '--format', + format, + ]) + + assertCommandSuccess(result, 'export script') + + expect(fs.existsSync(outputFilePath)).to.equal(true) + expect(fs.statSync(outputFilePath).size).to.be.greaterThan(0) + + state.outputFilePath = outputFilePath + this.parameters[ROUNDTRIP_KEY] = state + }, +) + +When('I remove the seeded roundtrip events from the database', async function (this: World>) { + const state = getRoundtripState(this) + const dbClient = getMasterDbClient() + + await dbClient('events') + .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) + .delete() +}) + +When('I import the compressed roundtrip file', async function (this: World>) { + const state = getRoundtripState(this) + + const result = await runCliScript('src/import-events.ts', [ + state.outputFilePath, + '--batch-size', + '2', + ]) + + assertCommandSuccess(result, 'import script') +}) + +Then('the seeded roundtrip events are restored', async function (this: World>) { + const state = getRoundtripState(this) + const dbClient = getMasterDbClient() + + const rows = await dbClient('events') + .select('event_id', 'event_content') + .where('event_pubkey', Buffer.from(state.pubkey, 'hex')) + + const actualIds = rows + .map((row: { event_id: Buffer }) => row.event_id.toString('hex')) + .sort() + + const actualContents = rows + .map((row: { event_content: string }) => row.event_content) + .sort() + + expect(actualIds).to.deep.equal(state.expectedIds) + expect(actualContents).to.deep.equal(state.expectedContents) +}) + +After({ tags: '@compression-roundtrip' }, async function (this: World>) { + const state = this.parameters[ROUNDTRIP_KEY] as CompressionRoundtripState | undefined + + if (state?.tempDir) { + fs.rmSync(state.tempDir, { + force: true, + recursive: true, + }) + } + + this.parameters[ROUNDTRIP_KEY] = undefined +}) \ No newline at end of file From 11ececcf19e5528aa830466fcfccbce1f3d3ba3f Mon Sep 17 00:00:00 2001 From: anshumancanrock Date: Mon, 20 Apr 2026 01:28:40 +0530 Subject: [PATCH 12/12] fix(redis): use logger in hash key methods --- src/adapters/redis-adapter.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/adapters/redis-adapter.ts b/src/adapters/redis-adapter.ts index cc1f722f..3b8e062f 100644 --- a/src/adapters/redis-adapter.ts +++ b/src/adapters/redis-adapter.ts @@ -98,19 +98,19 @@ export class RedisAdapter implements ICacheAdapter { public async deleteKey(key: string): Promise { await this.connection - debug('delete %s key', key) + logger('delete %s key', key) return this.client.del(key) } public async getHKey(key: string, field: string): Promise { await this.connection - debug('get %s field for key %s', field, key) + logger('get %s field for key %s', field, key) return await this.client.hGet(key, field) ?? '' } public async setHKey(key: string, fields: Record): Promise { await this.connection - debug('set %s key', key) + logger('set %s key', key) return await this.client.hSet(key, fields) >= 0 }