diff --git a/src/core/cache/spool.js b/src/core/cache/spool.js index ab706c1..79daabc 100644 --- a/src/core/cache/spool.js +++ b/src/core/cache/spool.js @@ -4,6 +4,8 @@ import fs from 'node:fs/promises' import fsSync from 'node:fs' import path from 'node:path' +import { readProgress, removeProgress, streamFlushFile, writeProgress } from './streaming-reader.js' + /** * @import { ColumnSpec } from '../../../collectivus-plugin-kernel-types.d.ts' * @import { CacheSpool, FlushResult, PendingInfo, SpoolAppendResult } from './types.d.ts' @@ -28,7 +30,6 @@ const LAST_FLUSH_FILE = 'last-flush.json' * @returns {CacheSpool} */ export function createCacheSpool(args) { - const rowChunkSize = args.rowChunkSize ?? DEFAULT_FLUSH_ROW_CHUNK_SIZE /** @type {Map, flushLock: Promise }>} */ const states = new Map() /** @type {Set} */ @@ -111,13 +112,19 @@ export function createCacheSpool(args) { let chunkCount = 0 let bytesWritten = 0 for (const filePath of files) { - const chunks = await readFlushChunks(filePath, rowChunkSize) - for (const chunk of chunks) { - const written = await args.appendChunk(tablePath, chunk.columns, chunk.rows) - rowCount += chunk.rows.length + const progress = await readProgress(filePath) + const startOffset = progress?.byteOffset ?? 0 + const batchId = `flush-${Date.now()}-${process.pid}` + + for await (const batch of streamFlushFile({ filePath, batchId, startOffset })) { + const written = await args.appendChunk(tablePath, batch.chunk.columns, batch.chunk.rows) + rowCount += batch.chunk.rows.length chunkCount += 1 bytesWritten += written.bytesWritten + await writeProgress(filePath, batch.resumeOffset) } + + await removeProgress(filePath) await fs.rm(filePath, { force: true }) } if (chunkCount > 0) { @@ -206,51 +213,6 @@ function listFlushFiles(tablePath) { } } -/** - * @param {string} filePath - * @param {number} rowChunkSize - * @returns {Promise[] }>>} - */ -async function readFlushChunks(filePath, rowChunkSize) { - const text = await fs.readFile(filePath, 'utf8') - /** @type {Array<{ columns: readonly ColumnSpec[], rows: Record[] }>} */ - const chunks = [] - /** @type {readonly ColumnSpec[] | null} */ - let currentColumns = null - let currentSignature = '' - /** @type {Record[]} */ - let currentRows = [] - - function flushCurrent() { - if (!currentColumns || currentRows.length === 0) return - chunks.push({ columns: currentColumns, rows: currentRows }) - currentColumns = null - currentSignature = '' - currentRows = [] - } - - for (const line of text.split('\n')) { - if (line.length === 0) continue - const envelope = /** @type {{ version?: number, columns?: readonly ColumnSpec[], rows?: Record[] }} */ (JSON.parse(line)) - if (envelope.version !== 1 || !Array.isArray(envelope.columns) || !Array.isArray(envelope.rows)) { - throw new Error(`invalid cache spool envelope in ${filePath}`) - } - const signature = JSON.stringify(envelope.columns) - if (currentColumns && signature !== currentSignature) flushCurrent() - currentColumns = envelope.columns - currentSignature = signature - for (const row of envelope.rows) { - currentRows.push(row) - if (currentRows.length >= rowChunkSize) flushCurrent() - if (!currentColumns) { - currentColumns = envelope.columns - currentSignature = signature - } - } - } - flushCurrent() - return chunks -} /** * @param {string} tablePath diff --git a/src/core/cache/streaming-reader.js b/src/core/cache/streaming-reader.js new file mode 100644 index 0000000..d4dd9da --- /dev/null +++ b/src/core/cache/streaming-reader.js @@ -0,0 +1,254 @@ +// @ts-check + +import { createHash } from 'node:crypto' +import { createReadStream } from 'node:fs' +import fs from 'node:fs/promises' + +/** + * @import { ColumnSpec } from '../../../collectivus-plugin-kernel-types.d.ts' + */ + +/** + * @typedef {{ + * columns: readonly ColumnSpec[], + * rows: Record[], + * }} FlushChunk + */ + +/** + * @typedef {{ + * byteOffset: number, + * updatedAt: string, + * }} ProgressState + */ + +export const BATCH_BYTE_LIMIT = 128 * 1024 * 1024 +export const BATCH_ROW_LIMIT = 100_000 + +/** + * Read a rotated spool file as a stream, yielding batches of rows that + * respect both a byte-size ceiling and a row-count ceiling. Partial + * trailing lines are left in the buffer — the caller should treat them + * as data for the next read cycle (in practice the spool writer always + * ends lines with `\n`, so a partial line means the file was truncated + * or is still being written). + * + * Each emitted row is decorated with: + * - `_hyp_cache_row_id` — SHA-256 of the serialized row (stable dedup key) + * - `_hyp_cache_batch_id` — caller-supplied batch identifier + * + * Resume support: if `startOffset` > 0 the reader seeks past already- + * flushed bytes and continues from there. After each yielded batch the + * caller should persist `batch.resumeOffset` so a crash-restart picks + * up where we left off. + * + * @param {{ + * filePath: string, + * batchId: string, + * startOffset?: number, + * batchByteLimit?: number, + * batchRowLimit?: number, + * }} opts + * @returns {AsyncGenerator<{ + * chunk: FlushChunk, + * resumeOffset: number, + * malformedCount: number, + * }>} + */ +export async function* streamFlushFile(opts) { + const { + filePath, + batchId, + startOffset = 0, + batchByteLimit = BATCH_BYTE_LIMIT, + batchRowLimit = BATCH_ROW_LIMIT, + } = opts + + const stream = createReadStream(filePath, { + start: startOffset, + encoding: 'utf8', + highWaterMark: 64 * 1024, + }) + + let tail = '' + let absoluteOffset = startOffset + /** @type {readonly ColumnSpec[] | null} */ + let currentColumns = null + let currentSignature = '' + /** @type {Record[]} */ + let currentRows = [] + let currentBatchBytes = 0 + let malformedCount = 0 + + /** + * @returns {FlushChunk | null} + */ + function sealBatch() { + if (!currentColumns || currentRows.length === 0) return null + const chunk = { columns: currentColumns, rows: currentRows } + currentColumns = null + currentSignature = '' + currentRows = [] + currentBatchBytes = 0 + return chunk + } + + for await (const data of stream) { + const text = /** @type {string} */ (data) + tail += text + + let newlineIdx + while ((newlineIdx = tail.indexOf('\n')) !== -1) { + const line = tail.slice(0, newlineIdx) + const lineByteLen = Buffer.byteLength(line, 'utf8') + 1 + tail = tail.slice(newlineIdx + 1) + absoluteOffset += lineByteLen + + if (line.length === 0) continue + + /** @type {{ version?: number, columns?: readonly ColumnSpec[], rows?: Record[] } | null} */ + let envelope = null + try { + envelope = JSON.parse(line) + } catch { + malformedCount++ + continue + } + + if ( + !envelope || + envelope.version !== 1 || + !Array.isArray(envelope.columns) || + !Array.isArray(envelope.rows) + ) { + malformedCount++ + continue + } + + const signature = JSON.stringify(envelope.columns) + if (currentColumns && signature !== currentSignature) { + const sealed = sealBatch() + if (sealed) { + yield { chunk: sealed, resumeOffset: absoluteOffset - lineByteLen, malformedCount } + malformedCount = 0 + } + } + + currentColumns = envelope.columns + currentSignature = signature + + for (const row of envelope.rows) { + const decorated = decorateRow(row, batchId) + const rowBytes = Buffer.byteLength(JSON.stringify(row), 'utf8') + currentRows.push(decorated) + currentBatchBytes += rowBytes + + if (currentRows.length >= batchRowLimit || currentBatchBytes >= batchByteLimit) { + const sealed = sealBatch() + if (sealed) { + yield { chunk: sealed, resumeOffset: absoluteOffset, malformedCount } + malformedCount = 0 + } + if (!currentColumns) { + currentColumns = envelope.columns + currentSignature = signature + } + } + } + } + } + + const sealed = sealBatch() + if (sealed) { + yield { chunk: sealed, resumeOffset: absoluteOffset, malformedCount } + } +} + +/** + * @param {Record} row + * @param {string} batchId + * @returns {Record} + */ +function decorateRow(row, batchId) { + const serialized = JSON.stringify(row, stableReplacer) + const hash = createHash('sha256').update(serialized).digest('hex') + return { + ...row, + _hyp_cache_row_id: hash, + _hyp_cache_batch_id: batchId, + } +} + +/** + * Stable JSON key ordering for deterministic hashes. + * + * @param {string} _key + * @param {unknown} value + * @returns {unknown} + */ +function stableReplacer(_key, value) { + if (value && typeof value === 'object' && !Array.isArray(value)) { + /** @type {Record} */ + const sorted = {} + for (const k of Object.keys(value).sort()) { + sorted[k] = /** @type {Record} */ (value)[k] + } + return sorted + } + if (typeof value === 'bigint') return value.toString() + return value +} + +/** + * Read persisted progress for a spool file. + * + * @param {string} spoolFilePath + * @returns {Promise} + */ +export async function readProgress(spoolFilePath) { + try { + const raw = await fs.readFile(progressPath(spoolFilePath), 'utf8') + const parsed = /** @type {ProgressState} */ (JSON.parse(raw)) + if (typeof parsed.byteOffset !== 'number' || !Number.isFinite(parsed.byteOffset)) return null + return parsed + } catch { + return null + } +} + +/** + * Persist flush progress for a spool file. Uses atomic write-rename. + * + * @param {string} spoolFilePath + * @param {number} byteOffset + */ +export async function writeProgress(spoolFilePath, byteOffset) { + const dest = progressPath(spoolFilePath) + const tmp = `${dest}.tmp.${process.pid}.${Date.now()}` + /** @type {ProgressState} */ + const state = { byteOffset, updatedAt: new Date().toISOString() } + await fs.writeFile(tmp, JSON.stringify(state, null, 2), 'utf8') + await fs.rename(tmp, dest) +} + +/** + * Remove the progress file for a spool file. + * + * @param {string} spoolFilePath + */ +export async function removeProgress(spoolFilePath) { + await fs.rm(progressPath(spoolFilePath), { force: true }) +} + +/** + * @param {string} spoolFilePath + * @returns {string} + */ +function progressPath(spoolFilePath) { + return `${spoolFilePath}.progress.json` +} + +/** + * Internal-field names that should be hidden from query output. + */ +export const INTERNAL_FIELDS = ['_hyp_cache_row_id', '_hyp_cache_batch_id'] diff --git a/test/core/streaming-reader.test.js b/test/core/streaming-reader.test.js new file mode 100644 index 0000000..f359121 --- /dev/null +++ b/test/core/streaming-reader.test.js @@ -0,0 +1,310 @@ +// @ts-check + +import test from 'node:test' +import assert from 'node:assert/strict' +import fs from 'node:fs/promises' +import path from 'node:path' +import os from 'node:os' + +import { + BATCH_BYTE_LIMIT, + BATCH_ROW_LIMIT, + INTERNAL_FIELDS, + readProgress, + removeProgress, + streamFlushFile, + writeProgress, +} from '../../src/core/cache/streaming-reader.js' + +/** + * @import { ColumnSpec } from '../../collectivus-plugin-kernel-types.d.ts' + */ + +/** @type {ColumnSpec[]} */ +const COLUMNS = [ + { name: 'id', type: 'INT64', nullable: false }, + { name: 'msg', type: 'STRING', nullable: false }, +] + +/** + * @param {Record[]} rows + * @returns {string} + */ +function envelope(rows) { + return JSON.stringify({ version: 1, columns: COLUMNS, rows }) + '\n' +} + +/** + * @returns {Promise} + */ +async function makeTmpDir() { + return fs.mkdtemp(path.join(os.tmpdir(), 'hyp-stream-test-')) +} + +test('streaming reader handles a large file without loading it into memory', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'big.jsonl') + + const rowsPerLine = 50 + const lineCount = 2000 + const handle = await fs.open(filePath, 'w') + for (let i = 0; i < lineCount; i++) { + const rows = [] + for (let j = 0; j < rowsPerLine; j++) { + rows.push({ id: i * rowsPerLine + j, msg: 'x'.repeat(200) }) + } + await handle.writeFile(envelope(rows)) + } + await handle.close() + + const stat = await fs.stat(filePath) + assert.ok(stat.size > 500 * 1024 * 1024 / 1024, 'test file should be non-trivial') + + let totalRows = 0 + let batchCount = 0 + for await (const batch of streamFlushFile({ filePath, batchId: 'test-big' })) { + totalRows += batch.chunk.rows.length + batchCount++ + assert.ok(batch.chunk.rows.length <= BATCH_ROW_LIMIT) + } + + assert.equal(totalRows, lineCount * rowsPerLine) + assert.ok(batchCount >= 1) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('partial trailing line is preserved and correctly handled', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'partial.jsonl') + + const completeLine = envelope([{ id: 1, msg: 'complete' }]) + const partialLine = '{"version":1,"columns":[{"name":"id","type":"INT64","nullable":false}],"rows":[{"id":2}]' + await fs.writeFile(filePath, completeLine + partialLine) + + let totalRows = 0 + for await (const batch of streamFlushFile({ filePath, batchId: 'test-partial' })) { + totalRows += batch.chunk.rows.length + } + + assert.equal(totalRows, 1, 'only complete lines should be yielded') + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('malformed JSON lines are counted, logged, and skipped without aborting', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'malformed.jsonl') + + const good1 = envelope([{ id: 1, msg: 'ok' }]) + const bad1 = '{not valid json}\n' + const bad2 = JSON.stringify({ version: 2, columns: [], rows: [] }) + '\n' + const good2 = envelope([{ id: 2, msg: 'also ok' }]) + await fs.writeFile(filePath, good1 + bad1 + bad2 + good2) + + let totalRows = 0 + let totalMalformed = 0 + for await (const batch of streamFlushFile({ filePath, batchId: 'test-malformed' })) { + totalRows += batch.chunk.rows.length + totalMalformed += batch.malformedCount + } + + assert.equal(totalRows, 2) + assert.equal(totalMalformed, 2) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('resume cursor updates correctly and restart from cursor produces identical results', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'resume.jsonl') + + const lines = [] + for (let i = 0; i < 5; i++) { + lines.push(envelope([{ id: i, msg: `row-${i}` }])) + } + await fs.writeFile(filePath, lines.join('')) + + const firstRunRows = [] + let resumeAfterTwo = 0 + let batchesSeen = 0 + for await (const batch of streamFlushFile({ + filePath, + batchId: 'run1', + batchRowLimit: 2, + })) { + batchesSeen++ + for (const row of batch.chunk.rows) { + firstRunRows.push(row) + } + if (batchesSeen === 1) { + resumeAfterTwo = batch.resumeOffset + await writeProgress(filePath, resumeAfterTwo) + break + } + } + + assert.equal(firstRunRows.length, 2) + assert.ok(resumeAfterTwo > 0) + + const progress = await readProgress(filePath) + assert.ok(progress) + assert.equal(progress.byteOffset, resumeAfterTwo) + + const secondRunRows = [] + for await (const batch of streamFlushFile({ + filePath, + batchId: 'run2', + startOffset: progress.byteOffset, + })) { + for (const row of batch.chunk.rows) { + secondRunRows.push(row) + } + } + + assert.equal(secondRunRows.length, 3) + + const allIds = [...firstRunRows, ...secondRunRows] + .map((r) => /** @type {number} */ (r.id)) + .sort((a, b) => a - b) + assert.deepEqual(allIds, [0, 1, 2, 3, 4]) + + await removeProgress(filePath) + const gone = await readProgress(filePath) + assert.equal(gone, null) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('batch boundaries respect row-count threshold', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'row-limit.jsonl') + + const rows = [] + for (let i = 0; i < 10; i++) { + rows.push({ id: i, msg: `r${i}` }) + } + await fs.writeFile(filePath, envelope(rows)) + + const batches = [] + for await (const batch of streamFlushFile({ + filePath, + batchId: 'test-row-limit', + batchRowLimit: 3, + })) { + batches.push(batch.chunk.rows.length) + } + + assert.ok(batches.length >= 3, `expected at least 3 batches, got ${batches.length}`) + for (const size of batches) { + assert.ok(size <= 3, `batch size ${size} exceeds limit of 3`) + } + const total = batches.reduce((a, b) => a + b, 0) + assert.equal(total, 10) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('batch boundaries respect byte-size threshold', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'byte-limit.jsonl') + + const rows = [] + for (let i = 0; i < 20; i++) { + rows.push({ id: i, msg: 'x'.repeat(500) }) + } + await fs.writeFile(filePath, envelope(rows)) + + const singleRowBytes = Buffer.byteLength(JSON.stringify(rows[0]), 'utf8') + const tinyByteLimit = singleRowBytes * 3 + + const batches = [] + for await (const batch of streamFlushFile({ + filePath, + batchId: 'test-byte-limit', + batchByteLimit: tinyByteLimit, + batchRowLimit: BATCH_ROW_LIMIT, + })) { + batches.push(batch.chunk.rows.length) + } + + assert.ok(batches.length > 1, `expected multiple batches, got ${batches.length}`) + const total = batches.reduce((a, b) => a + b, 0) + assert.equal(total, 20) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('rows are decorated with internal fields', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'fields.jsonl') + + await fs.writeFile(filePath, envelope([{ id: 1, msg: 'hello' }])) + + for await (const batch of streamFlushFile({ filePath, batchId: 'batch-42' })) { + for (const row of batch.chunk.rows) { + assert.ok(typeof row._hyp_cache_row_id === 'string') + assert.ok(/** @type {string} */ (row._hyp_cache_row_id).length === 64) + assert.equal(row._hyp_cache_batch_id, 'batch-42') + } + } + + assert.ok(INTERNAL_FIELDS.includes('_hyp_cache_row_id')) + assert.ok(INTERNAL_FIELDS.includes('_hyp_cache_batch_id')) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('identical rows produce the same _hyp_cache_row_id', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'dedup.jsonl') + + await fs.writeFile( + filePath, + envelope([{ id: 1, msg: 'same' }]) + envelope([{ id: 1, msg: 'same' }]) + ) + + /** @type {string[]} */ + const ids = [] + for await (const batch of streamFlushFile({ filePath, batchId: 'dedup' })) { + for (const row of batch.chunk.rows) { + ids.push(/** @type {string} */ (row._hyp_cache_row_id)) + } + } + + assert.equal(ids.length, 2) + assert.equal(ids[0], ids[1]) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('progress file write and read roundtrip', async () => { + const dir = await makeTmpDir() + const fakeSpool = path.join(dir, 'flush-123.jsonl') + await fs.writeFile(fakeSpool, '') + + await writeProgress(fakeSpool, 4096) + const state = await readProgress(fakeSpool) + assert.ok(state) + assert.equal(state.byteOffset, 4096) + assert.ok(typeof state.updatedAt === 'string') + + await removeProgress(fakeSpool) + assert.equal(await readProgress(fakeSpool), null) + + await fs.rm(dir, { recursive: true, force: true }) +}) + +test('empty file yields no batches', async () => { + const dir = await makeTmpDir() + const filePath = path.join(dir, 'empty.jsonl') + await fs.writeFile(filePath, '') + + let count = 0 + for await (const _batch of streamFlushFile({ filePath, batchId: 'empty' })) { + count++ + } + assert.equal(count, 0) + + await fs.rm(dir, { recursive: true, force: true }) +})