From bdff87a274dc035e34922dd06eeaeb9add58638c Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 13 Apr 2026 11:28:25 +0200 Subject: [PATCH 1/3] feat: add helper commands foundation (errors, shared utils, registration) Extract error utilities into shared `src/es/errors.ts` so both the existing ES handler and new helper commands can reuse them. Add shared helper utilities (NDJSON parsing, bulk body builder, retry with backoff, concurrency runner, progress reporter, glob file discovery) and register an empty `helpers` group under the `es` command tree. --- src/es/errors.ts | 29 +++++ src/es/handler.ts | 23 +--- src/es/helpers/register.ts | 23 ++++ src/es/helpers/shared.ts | 166 ++++++++++++++++++++++++++ src/es/register.ts | 5 +- test/es/helpers/register.test.ts | 22 ++++ test/es/helpers/shared.test.ts | 192 +++++++++++++++++++++++++++++++ test/es/register.test.ts | 16 +-- 8 files changed, 446 insertions(+), 30 deletions(-) create mode 100644 src/es/errors.ts create mode 100644 src/es/helpers/register.ts create mode 100644 src/es/helpers/shared.ts create mode 100644 test/es/helpers/register.test.ts create mode 100644 test/es/helpers/shared.test.ts diff --git a/src/es/errors.ts b/src/es/errors.ts new file mode 100644 index 00000000..a0f90c79 --- /dev/null +++ b/src/es/errors.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { errors } from '@elastic/transport' +import type { JsonValue } from '../factory.ts' + +/** Builds a `missing_config` error payload from a thrown error. */ +export function missingConfigError (err: unknown): JsonValue { + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'missing_config', message } } +} + +/** Builds a `transport_error` payload from a thrown transport error. */ +export function transportError (err: unknown): JsonValue { + if (err instanceof errors.ResponseError) { + return { + error: { + code: 'transport_error', + status_code: err.statusCode ?? null, + body: err.body as JsonValue ?? null + } + } + } + + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'transport_error', message } } +} diff --git a/src/es/handler.ts b/src/es/handler.ts index d6972466..d20e41c5 100644 --- a/src/es/handler.ts +++ b/src/es/handler.ts @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { errors } from '@elastic/transport' import type { Transport } from '@elastic/transport' import type { EsApiDefinition } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { buildRequestParams } from './request-builder.ts' import { getTransport } from '../lib/transport.ts' +import { missingConfigError, transportError } from './errors.ts' import type { JsonValue, ParsedResult } from '../factory.ts' /** @@ -76,24 +76,3 @@ export function createEsHandler ( } } -/** builds a `missing_config` error payload from a thrown error */ -function missingConfigError (err: unknown): JsonValue { - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'missing_config', message } } -} - -/** builds a `transport_error` payload from a thrown transport error */ -function transportError (err: unknown): JsonValue { - if (err instanceof errors.ResponseError) { - return { - error: { - code: 'transport_error', - status_code: err.statusCode ?? null, - body: err.body as JsonValue ?? null - } - } - } - - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'transport_error', message } } -} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts new file mode 100644 index 00000000..097c31a2 --- /dev/null +++ b/src/es/helpers/register.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { defineGroup } from '../../factory.ts' +import type { OpaqueCommandHandle } from '../../factory.ts' + +/** + * Registers all high-level helper commands under a `helpers` group. + * Helper commands provide convenience abstractions over common Elasticsearch + * workflows (bulk ingestion, scroll search, multi-search batching). + * + * @returns an `OpaqueCommandHandle` for the `helpers` group + */ +export function registerHelperCommands (): OpaqueCommandHandle { + const commands: OpaqueCommandHandle[] = [] + + return defineGroup( + { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, + ...commands + ) +} diff --git a/src/es/helpers/shared.ts b/src/es/helpers/shared.ts new file mode 100644 index 00000000..3a7504d5 --- /dev/null +++ b/src/es/helpers/shared.ts @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFileSync, globSync } from 'node:fs' +import { resolve } from 'node:path' +import type { JsonValue } from '../../factory.ts' + +/** + * Parses raw text input as either a JSON array or NDJSON (newline-delimited JSON). + * Auto-detects the format: if the trimmed input starts with `[`, it's parsed as JSON array; + * otherwise each non-empty line is parsed as a separate JSON object. + */ +export function parseInput (raw: string): unknown[] { + const trimmed = raw.trim() + if (trimmed.length === 0) return [] + + if (trimmed.startsWith('[')) { + const parsed = JSON.parse(trimmed) + if (!Array.isArray(parsed)) { + throw new Error('Expected a JSON array, got: ' + typeof parsed) + } + return parsed + } + + const lines = trimmed.split('\n') + const docs: unknown[] = [] + for (let i = 0; i < lines.length; i++) { + const line = lines[i]!.trim() + if (line.length === 0) continue + try { + docs.push(JSON.parse(line)) + } catch { + throw new Error(`Failed to parse NDJSON at line ${i + 1}: ${line.slice(0, 80)}`) + } + } + return docs +} + +/** + * Reads raw text content from a file path or stdin. + * Returns `undefined` when no input is available (interactive TTY with no file). + */ +export function readRawInput (filePath?: string): string | undefined { + if (filePath != null) { + return readFileSync(filePath, 'utf-8') + } + if (!process.stdin.isTTY) { + const content = readFileSync(0, 'utf-8') + return content.length > 0 ? content : undefined + } + return undefined +} + +/** + * Resolves a glob pattern against a directory and returns a sorted list of absolute file paths. + * Uses Node.js native `fs.globSync` (available since Node 22). + */ +export function globFiles (dir: string, pattern: string): string[] { + const absDir = resolve(dir) + const matches = globSync(pattern, { cwd: absDir }) + return matches.map((f) => resolve(absDir, f)).sort() +} + +/** + * Builds an NDJSON body for the Elasticsearch `_bulk` API. + * Each document is wrapped in an `{"index": {...}}` action line followed by the document line. + */ +export function buildBulkNdjsonBody ( + docs: unknown[], + opts: { index?: string, pipeline?: string, routing?: string } +): string { + const lines: string[] = [] + for (const doc of docs) { + const action: Record = {} + if (opts.index != null) action._index = opts.index + if (opts.pipeline != null) action.pipeline = opts.pipeline + if (opts.routing != null) action.routing = opts.routing + lines.push(JSON.stringify({ index: action })) + lines.push(JSON.stringify(doc)) + } + // bulk API requires a trailing newline + return lines.join('\n') + '\n' +} + +/** + * Retries an async function with exponential backoff. + * On each failure the delay doubles. Rethrows the last error after exhausting retries. + */ +export async function retryWithBackoff ( + fn: () => Promise, + opts: { retries: number, delay: number } +): Promise { + let lastError: unknown + let currentDelay = opts.delay + for (let attempt = 0; attempt <= opts.retries; attempt++) { + try { + return await fn() + } catch (err) { + lastError = err + if (attempt < opts.retries) { + await new Promise((resolve) => setTimeout(resolve, currentDelay)) + currentDelay *= 2 + } + } + } + throw lastError +} + +/** + * Runs async tasks with a concurrency limit. + * Returns results in the same order as the input items. + */ +export async function runWithConcurrency ( + items: T[], + concurrency: number, + fn: (item: T, index: number) => Promise +): Promise { + const results: R[] = new Array(items.length) + let nextIndex = 0 + + async function worker (): Promise { + while (nextIndex < items.length) { + const i = nextIndex++ + results[i] = await fn(items[i]!, i) + } + } + + const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker()) + await Promise.all(workers) + return results +} + +/** Tracks bulk ingestion progress and writes updates to stderr. */ +export class ProgressReporter { + total = 0 + succeeded = 0 + failed = 0 + retries = 0 + filesProcessed = 0 + private readonly startTime = Date.now() + + report (batchSize: number, batchErrors: number, fileName?: string): void { + this.total += batchSize + this.succeeded += batchSize - batchErrors + this.failed += batchErrors + const prefix = fileName != null ? `[${fileName}] ` : '' + process.stderr.write( + `\r${prefix}Progress: ${this.succeeded} succeeded, ${this.failed} failed, ${this.total} total` + ) + } + + summary (): JsonValue { + const elapsed_ms = Date.now() - this.startTime + process.stderr.write('\n') + return { + total: this.total, + succeeded: this.succeeded, + failed: this.failed, + retries: this.retries, + elapsed_ms, + ...(this.filesProcessed > 0 && { files_processed: this.filesProcessed }) + } + } +} diff --git a/src/es/register.ts b/src/es/register.ts index 614fa091..00ff55bb 100644 --- a/src/es/register.ts +++ b/src/es/register.ts @@ -11,6 +11,7 @@ import { validateApiDefinition, resolveInput } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { allApis } from './apis.ts' import { createEsHandler } from './handler.ts' +import { registerHelperCommands } from './helpers/register.ts' /** Builds a leaf command handle from a definition and its pre-computed schema args. */ function buildLeafHandle ( @@ -108,5 +109,7 @@ export function registerEsCommands ( rootHandles.push(buildLeafHandle(def, defSchemaArgs)) } - return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles) + const helpersGroup = registerHelperCommands() + + return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles, helpersGroup) } diff --git a/test/es/helpers/register.test.ts b/test/es/helpers/register.test.ts new file mode 100644 index 00000000..7ff5b2b9 --- /dev/null +++ b/test/es/helpers/register.test.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { Command } from 'commander' +import { registerHelperCommands } from '../../../src/es/helpers/register.ts' + +describe('registerHelperCommands', () => { + it('returns a command group named "helpers"', () => { + const group = registerHelperCommands() + assert.ok(group instanceof Command) + assert.equal(group.name(), 'helpers') + }) + + it('has a description', () => { + const group = registerHelperCommands() + assert.ok(group.description().length > 0) + }) +}) diff --git a/test/es/helpers/shared.test.ts b/test/es/helpers/shared.test.ts new file mode 100644 index 00000000..09b6e31b --- /dev/null +++ b/test/es/helpers/shared.test.ts @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { + parseInput, + buildBulkNdjsonBody, + retryWithBackoff, + runWithConcurrency, + ProgressReporter +} from '../../../src/es/helpers/shared.ts' + +describe('parseInput', () => { + it('parses a JSON array', () => { + const result = parseInput('[{"a":1},{"b":2}]') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('parses NDJSON', () => { + const result = parseInput('{"a":1}\n{"b":2}\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('skips empty lines in NDJSON', () => { + const result = parseInput('{"a":1}\n\n{"b":2}\n\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('returns empty array for empty input', () => { + assert.deepStrictEqual(parseInput(''), []) + assert.deepStrictEqual(parseInput(' \n '), []) + }) + + it('parses a single JSON object as NDJSON', () => { + const result = parseInput('{"a":1}') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) + + it('throws on malformed NDJSON line', () => { + assert.throws(() => parseInput('{"a":1}\nnot json\n'), /Failed to parse NDJSON at line 2/) + }) + + it('handles JSON array with whitespace', () => { + const result = parseInput(' \n [{"a":1}] \n ') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) +}) + +describe('buildBulkNdjsonBody', () => { + it('wraps documents in index actions', () => { + const body = buildBulkNdjsonBody([{ title: 'doc1' }, { title: 'doc2' }], { index: 'my-index' }) + const lines = body.split('\n') + assert.equal(lines.length, 5) // 4 content lines + trailing empty line + assert.deepStrictEqual(JSON.parse(lines[0]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[1]), { title: 'doc1' }) + assert.deepStrictEqual(JSON.parse(lines[2]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[3]), { title: 'doc2' }) + assert.equal(lines[4], '') // trailing newline + }) + + it('includes pipeline and routing in action metadata', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], { + index: 'idx', + pipeline: 'my-pipe', + routing: 'shard-1' + }) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { + index: { _index: 'idx', pipeline: 'my-pipe', routing: 'shard-1' } + }) + }) + + it('returns trailing newline for empty docs', () => { + const body = buildBulkNdjsonBody([], { index: 'idx' }) + assert.equal(body, '\n') + }) + + it('omits undefined metadata fields', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], {}) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { index: {} }) + }) +}) + +describe('retryWithBackoff', () => { + it('returns result on first success', async () => { + const result = await retryWithBackoff(() => Promise.resolve(42), { retries: 3, delay: 1 }) + assert.equal(result, 42) + }) + + it('retries and succeeds', async () => { + let attempt = 0 + const result = await retryWithBackoff(() => { + attempt++ + if (attempt < 3) throw new Error('fail') + return Promise.resolve('ok') + }, { retries: 3, delay: 1 }) + assert.equal(result, 'ok') + assert.equal(attempt, 3) + }) + + it('throws after exhausting retries', async () => { + await assert.rejects( + () => retryWithBackoff(() => Promise.reject(new Error('always fail')), { retries: 2, delay: 1 }), + { message: 'always fail' } + ) + }) + + it('retries zero times when retries is 0', async () => { + let attempt = 0 + await assert.rejects(() => retryWithBackoff(() => { + attempt++ + return Promise.reject(new Error('fail')) + }, { retries: 0, delay: 1 })) + assert.equal(attempt, 1) + }) +}) + +describe('runWithConcurrency', () => { + it('processes all items and preserves order', async () => { + const results = await runWithConcurrency( + [1, 2, 3, 4, 5], + 2, + async (item) => item * 10 + ) + assert.deepStrictEqual(results, [10, 20, 30, 40, 50]) + }) + + it('limits concurrency', async () => { + let active = 0 + let maxActive = 0 + const results = await runWithConcurrency( + [1, 2, 3, 4], + 2, + async (item) => { + active++ + maxActive = Math.max(maxActive, active) + await new Promise((r) => setTimeout(r, 10)) + active-- + return item + } + ) + assert.deepStrictEqual(results, [1, 2, 3, 4]) + assert.ok(maxActive <= 2, `max concurrency was ${maxActive}, expected <= 2`) + }) + + it('handles empty input', async () => { + const results = await runWithConcurrency([], 5, async () => 'x') + assert.deepStrictEqual(results, []) + }) +}) + +describe('ProgressReporter', () => { + it('tracks counts correctly', () => { + const reporter = new ProgressReporter() + reporter.report(100, 5) + reporter.report(50, 0) + assert.equal(reporter.total, 150) + assert.equal(reporter.succeeded, 145) + assert.equal(reporter.failed, 5) + }) + + it('summary includes elapsed_ms', () => { + const reporter = new ProgressReporter() + reporter.report(10, 1) + reporter.retries = 2 + const summary = reporter.summary() as Record + assert.equal(summary.total, 10) + assert.equal(summary.succeeded, 9) + assert.equal(summary.failed, 1) + assert.equal(summary.retries, 2) + assert.equal(typeof summary.elapsed_ms, 'number') + }) + + it('summary includes files_processed when non-zero', () => { + const reporter = new ProgressReporter() + reporter.filesProcessed = 3 + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, 3) + }) + + it('summary omits files_processed when zero', () => { + const reporter = new ProgressReporter() + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, undefined) + }) +}) diff --git a/test/es/register.test.ts b/test/es/register.test.ts index 90eb9b00..5e61528b 100644 --- a/test/es/register.test.ts +++ b/test/es/register.test.ts @@ -34,7 +34,7 @@ describe('registerEsCommands', () => { it('creates one child group per unique namespace', () => { const handle = registerEsCommands(testDefs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'indices']) + assert.deepEqual(groupNames, ['cat', 'helpers', 'indices']) }) it('each namespace group has leaf commands matching definition names', () => { @@ -62,9 +62,11 @@ describe('registerEsCommands', () => { it('works with a single namespace', () => { const defs: EsApiDefinition[] = [makeDef('health', 'cat'), makeDef('nodes', 'cat')] const handle = registerEsCommands(defs) - assert.equal(handle.commands.length, 1) - assert.equal(handle.commands[0]?.name(), 'cat') - assert.equal(handle.commands[0]?.commands.length, 2) + // 1 namespace group + 1 helpers group + assert.equal(handle.commands.length, 2) + const cat = handle.commands.find((c) => c.name() === 'cat') + assert.ok(cat != null) + assert.equal(cat.commands.length, 2) }) it('throws on duplicate command names within a namespace', () => { @@ -149,7 +151,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const defs: EsApiDefinition[] = [makeRootDef('bulk'), makeRootDef('search')] const handle = registerEsCommands(defs) const names = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(names, ['bulk', 'search']) + assert.deepEqual(names, ['bulk', 'helpers', 'search']) }) it('namespace-less and namespaced definitions coexist under `es`', () => { @@ -161,7 +163,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const handle = registerEsCommands(defs) // `cat` group and `search` leaf are both direct children of `es` const topNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(topNames, ['cat', 'search']) + assert.deepEqual(topNames, ['cat', 'helpers', 'search']) const cat = handle.commands.find((c) => c.name() === 'cat') assert.ok(cat != null) assert.deepEqual(cat.commands.map((c) => c.name()).sort(), ['health', 'indices']) @@ -211,7 +213,7 @@ describe('registerEsCommands - extensibility', () => { ] const handle = registerEsCommands(defs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'cluster']) + assert.deepEqual(groupNames, ['cat', 'cluster', 'helpers']) const cluster = handle.commands.find((c) => c.name() === 'cluster') assert.ok(cluster != null) assert.equal(cluster.commands.length, 2) From f6ec130b18cee2f28967a103726c4c8c346212f7 Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 13 Apr 2026 13:14:03 +0200 Subject: [PATCH 2/3] feat: add scroll-search helper command Add `elastic es helpers scroll-search` for scrolling through all search results with automatic scroll context lifecycle management. Streams documents as NDJSON to stdout, supports --query, --input-file, configurable --scroll duration, --size per batch, and --max-docs limit. Always clears scroll context on completion or error. --- src/es/helpers/register.ts | 5 +- src/es/helpers/scroll-search.ts | 163 +++++++++++++++ test/es/helpers/scroll-search.test.ts | 286 ++++++++++++++++++++++++++ 3 files changed, 451 insertions(+), 3 deletions(-) create mode 100644 src/es/helpers/scroll-search.ts create mode 100644 test/es/helpers/scroll-search.test.ts diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts index 097c31a2..11e749ab 100644 --- a/src/es/helpers/register.ts +++ b/src/es/helpers/register.ts @@ -5,6 +5,7 @@ import { defineGroup } from '../../factory.ts' import type { OpaqueCommandHandle } from '../../factory.ts' +import { createScrollSearchCommand } from './scroll-search.ts' /** * Registers all high-level helper commands under a `helpers` group. @@ -14,10 +15,8 @@ import type { OpaqueCommandHandle } from '../../factory.ts' * @returns an `OpaqueCommandHandle` for the `helpers` group */ export function registerHelperCommands (): OpaqueCommandHandle { - const commands: OpaqueCommandHandle[] = [] - return defineGroup( { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, - ...commands + createScrollSearchCommand() ) } diff --git a/src/es/helpers/scroll-search.ts b/src/es/helpers/scroll-search.ts new file mode 100644 index 00000000..ab4ff3ed --- /dev/null +++ b/src/es/helpers/scroll-search.ts @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { Transport } from '@elastic/transport' +import { defineCommand } from '../../factory.ts' +import type { OpaqueCommandHandle, JsonValue, ParsedResult } from '../../factory.ts' +import { getTransport } from '../../lib/transport.ts' +import { missingConfigError, transportError } from '../errors.ts' +import { readRawInput } from './shared.ts' + +interface ScrollSearchOptions { + index: string + query?: string | undefined + 'input-file'?: string | undefined + scroll: string + size: number + 'max-docs': number +} + +interface SearchHit { + _source?: unknown + _id?: string +} + +interface SearchResponse { + _scroll_id?: string + hits?: { + hits?: SearchHit[] + total?: { value?: number } | number + } +} + +/** Dependencies injectable for testing. */ +export interface ScrollSearchDeps { + getTransport: () => Transport + stdout: { write: (chunk: string) => boolean } + stderr: { write: (chunk: string) => boolean } +} + +const defaultDeps: ScrollSearchDeps = { + getTransport, + stdout: process.stdout, + stderr: process.stderr +} + +function createScrollSearchHandler (deps: ScrollSearchDeps = defaultDeps) { + return async (parsed: ParsedResult): Promise => { + const opts = parsed.options as unknown as ScrollSearchOptions + + let transport: Transport + try { + transport = deps.getTransport() + } catch (err) { + return missingConfigError(err) + } + + // Parse query body from --query flag, --input-file, or stdin (in that priority order) + let queryBody: Record = {} + try { + if (opts.query != null) { + queryBody = JSON.parse(opts.query) as Record + } else if (opts['input-file'] != null) { + const raw = readRawInput(opts['input-file']) + if (raw != null && raw.trim().length > 0) { + queryBody = JSON.parse(raw) as Record + } + } else if (!process.stdin.isTTY) { + const raw = readRawInput() + if (raw != null && raw.trim().length > 0) { + queryBody = JSON.parse(raw) as Record + } + } + } catch (err) { + return { + error: { + code: 'input_error', + message: `Failed to parse query: ${err instanceof Error ? err.message : String(err)}` + } + } + } + + const startTime = Date.now() + let scrollId: string | undefined + let totalDocs = 0 + const maxDocs = opts['max-docs'] + + try { + // Initial search with scroll + const index = encodeURIComponent(opts.index) + const initialResult = await transport.request( + { + method: 'POST', + path: `/${index}/_search`, + querystring: { scroll: opts.scroll, size: opts.size }, + body: queryBody + } + ) + + scrollId = initialResult._scroll_id + let hits = initialResult.hits?.hits ?? [] + + // Process pages + while (hits.length > 0 && totalDocs < maxDocs) { + for (const hit of hits) { + if (totalDocs >= maxDocs) break + deps.stdout.write(JSON.stringify(hit._source) + '\n') + totalDocs++ + } + + if (totalDocs >= maxDocs || scrollId == null) break + + // Fetch next page + const scrollResult = await transport.request({ + method: 'POST', + path: '/_search/scroll', + body: { scroll: opts.scroll, scroll_id: scrollId } + }) + + scrollId = scrollResult._scroll_id + hits = scrollResult.hits?.hits ?? [] + } + } catch (err) { + return transportError(err) + } finally { + // Always clean up the scroll context + if (scrollId != null) { + try { + await transport.request({ + method: 'DELETE', + path: '/_search/scroll', + body: { scroll_id: scrollId } + }) + } catch { + // Best-effort cleanup — scroll will expire naturally + } + } + } + + const elapsed_ms = Date.now() - startTime + deps.stderr.write(`Fetched ${totalDocs} documents in ${elapsed_ms}ms\n`) + + return { total_docs: totalDocs, elapsed_ms } + } +} + +export function createScrollSearchCommand (deps?: ScrollSearchDeps): OpaqueCommandHandle { + return defineCommand({ + name: 'scroll-search', + description: 'Scroll through all search results, streaming documents as NDJSON to stdout.', + options: [ + { long: 'index', short: 'i', description: 'Target index', type: 'string', required: true }, + { long: 'query', short: 'q', description: 'Search query body as JSON string', type: 'string' }, + { long: 'input-file', description: 'Path to file containing search body JSON', type: 'string' }, + { long: 'scroll', description: 'Scroll keep-alive duration', type: 'string', defaultValue: '1m' }, + { long: 'size', description: 'Documents per scroll batch', type: 'number', defaultValue: 1000 }, + { long: 'max-docs', description: 'Maximum total documents to fetch (default: unlimited)', type: 'number', defaultValue: Infinity }, + ], + handler: createScrollSearchHandler(deps), + formatOutput: () => '' + }) +} diff --git a/test/es/helpers/scroll-search.test.ts b/test/es/helpers/scroll-search.test.ts new file mode 100644 index 00000000..6fcfad7a --- /dev/null +++ b/test/es/helpers/scroll-search.test.ts @@ -0,0 +1,286 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { mkdtempSync, writeFileSync } from 'node:fs' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import type { Transport, TransportRequestParams } from '@elastic/transport' +import { createScrollSearchCommand } from '../../../src/es/helpers/scroll-search.ts' +import type { ScrollSearchDeps } from '../../../src/es/helpers/scroll-search.ts' +import { Command } from 'commander' + +interface MockResponse { + _scroll_id?: string + hits?: { hits?: Array<{ _source: unknown }> } +} + +function mockTransport (responses: MockResponse[]): { + transport: Transport + requests: Array<{ params: TransportRequestParams }> +} { + const requests: Array<{ params: TransportRequestParams }> = [] + let callIndex = 0 + const transport = { + request: async (params: TransportRequestParams) => { + requests.push({ params }) + const response = responses[callIndex] ?? { hits: { hits: [] } } + callIndex++ + return response + } + } as unknown as Transport + return { transport, requests } +} + +function captureOutput (): { stdout: { write: (s: string) => boolean, chunks: string[] }, stderr: { write: (s: string) => boolean, chunks: string[] } } { + const stdoutChunks: string[] = [] + const stderrChunks: string[] = [] + return { + stdout: { + chunks: stdoutChunks, + write: (s: string) => { stdoutChunks.push(s); return true } + }, + stderr: { + chunks: stderrChunks, + write: (s: string) => { stderrChunks.push(s); return true } + } + } +} + +function makeDeps (transport: Transport, output?: ReturnType): ScrollSearchDeps { + const io = output ?? captureOutput() + return { getTransport: () => transport, stdout: io.stdout, stderr: io.stderr } +} + +async function runCommand (args: string[], deps: ScrollSearchDeps): Promise<{ result: unknown, stdout: string[], stderr: string[] }> { + const cmd = createScrollSearchCommand(deps) + const program = new Command() + program.exitOverride() + program.option('--json', 'output as JSON') + program.addCommand(cmd) + + const progChunks: string[] = [] + const origWrite = process.stdout.write.bind(process.stdout) + process.stdout.write = ((chunk: string) => { + progChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stdout.write + + try { + await program.parseAsync(['node', 'test', 'scroll-search', ...args]) + } finally { + process.stdout.write = origWrite + } + + let result: unknown + const output = progChunks.join('') + if (output.trim().length > 0) { + try { result = JSON.parse(output.trim()) } catch { result = output.trim() } + } + + return { + result, + stdout: (deps.stdout as { chunks: string[] }).chunks ?? [], + stderr: (deps.stderr as { chunks: string[] }).chunks ?? [] + } +} + +describe('scroll-search command', () => { + it('creates a command named scroll-search', () => { + const { transport } = mockTransport([]) + const cmd = createScrollSearchCommand(makeDeps(transport)) + assert.equal(cmd.name(), 'scroll-search') + }) + + it('fetches all documents across scroll pages', async () => { + const output = captureOutput() + const { transport, requests } = mockTransport([ + // Initial search response + { + _scroll_id: 'scroll-1', + hits: { hits: [{ _source: { id: 1 } }, { _source: { id: 2 } }] } + }, + // Second page + { + _scroll_id: 'scroll-1', + hits: { hits: [{ _source: { id: 3 } }] } + }, + // Empty page — signals end + { + _scroll_id: 'scroll-1', + hits: { hits: [] } + }, + // DELETE scroll response + {} + ]) + + const { result } = await runCommand( + ['--index', 'test-idx', '--query', '{}', '--json'], + makeDeps(transport, output) + ) + + // 3 docs streamed as NDJSON + assert.equal(output.stdout.chunks.length, 3) + assert.deepStrictEqual(JSON.parse(output.stdout.chunks[0]!), { id: 1 }) + assert.deepStrictEqual(JSON.parse(output.stdout.chunks[1]!), { id: 2 }) + assert.deepStrictEqual(JSON.parse(output.stdout.chunks[2]!), { id: 3 }) + + // Requests: initial search + 2 scroll + 1 delete + assert.equal(requests.length, 4) + assert.equal(requests[0]!.params.method, 'POST') + assert.ok(requests[0]!.params.path!.includes('_search')) + assert.equal(requests[3]!.params.method, 'DELETE') + }) + + it('respects --max-docs limit', async () => { + const output = captureOutput() + const { transport } = mockTransport([ + { + _scroll_id: 'scroll-1', + hits: { hits: [{ _source: { a: 1 } }, { _source: { a: 2 } }, { _source: { a: 3 } }] } + }, + // DELETE scroll + {} + ]) + + await runCommand( + ['--index', 'test-idx', '--query', '{}', '--max-docs', '2', '--json'], + makeDeps(transport, output) + ) + + assert.equal(output.stdout.chunks.length, 2) + }) + + it('passes scroll and size parameters to initial search', async () => { + const output = captureOutput() + const { transport, requests } = mockTransport([ + { _scroll_id: 'scroll-1', hits: { hits: [] } }, + {} + ]) + + await runCommand( + ['--index', 'test-idx', '--query', '{}', '--scroll', '5m', '--size', '500', '--json'], + makeDeps(transport, output) + ) + + const qs = requests[0]!.params.querystring as Record + assert.equal(qs.scroll, '5m') + assert.equal(qs.size, 500) + }) + + it('passes query body from --query flag', async () => { + const output = captureOutput() + const { transport, requests } = mockTransport([ + { _scroll_id: 'scroll-1', hits: { hits: [] } }, + {} + ]) + + await runCommand( + ['--index', 'test-idx', '--query', '{"match":{"title":"test"}}', '--json'], + makeDeps(transport, output) + ) + + const body = requests[0]!.params.body as Record + assert.deepStrictEqual(body, { match: { title: 'test' } }) + }) + + it('reads query from --input-file', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'scroll-test-')) + const filePath = join(tmpDir, 'query.json') + writeFileSync(filePath, '{"match_all":{}}') + + const output = captureOutput() + const { transport, requests } = mockTransport([ + { _scroll_id: 'scroll-1', hits: { hits: [] } }, + {} + ]) + + await runCommand( + ['--index', 'test-idx', '--input-file', filePath, '--json'], + makeDeps(transport, output) + ) + + const body = requests[0]!.params.body as Record + assert.deepStrictEqual(body, { match_all: {} }) + }) + + it('clears scroll context on completion', async () => { + const output = captureOutput() + const { transport, requests } = mockTransport([ + { _scroll_id: 'scroll-abc', hits: { hits: [{ _source: { a: 1 } }] } }, + { _scroll_id: 'scroll-abc', hits: { hits: [] } }, + {} // DELETE response + ]) + + await runCommand( + ['--index', 'test-idx', '--query', '{}', '--json'], + makeDeps(transport, output) + ) + + const deleteReq = requests.find((r) => r.params.method === 'DELETE') + assert.ok(deleteReq != null, 'Expected a DELETE request to clear scroll') + assert.deepStrictEqual((deleteReq.params.body as Record).scroll_id, 'scroll-abc') + }) + + it('clears scroll context on transport error', async () => { + const output = captureOutput() + let deleteCallCount = 0 + const transport = { + request: async (params: TransportRequestParams) => { + if (params.method === 'DELETE') { + deleteCallCount++ + return {} + } + if (params.path!.includes('_search/scroll')) { + throw new Error('scroll failed') + } + return { _scroll_id: 'scroll-err', hits: { hits: [{ _source: { a: 1 } }] } } + } + } as unknown as Transport + + await runCommand( + ['--index', 'test-idx', '--query', '{}', '--json'], + makeDeps(transport, output) + ) + + assert.equal(deleteCallCount, 1, 'Scroll should be cleared even after error') + }) + + it('returns missing_config error when transport is not configured', async () => { + const output = captureOutput() + const deps: ScrollSearchDeps = { + getTransport: () => { throw new Error('missing_config: no ES configured') }, + stdout: output.stdout, + stderr: output.stderr + } + + const { result } = await runCommand( + ['--index', 'test-idx', '--query', '{}', '--json'], + deps + ) + + const r = result as Record + const error = r.error as Record + assert.equal(error.code, 'missing_config') + }) + + it('writes summary to stderr', async () => { + const output = captureOutput() + const { transport } = mockTransport([ + { _scroll_id: 'scroll-1', hits: { hits: [{ _source: { a: 1 } }] } }, + { _scroll_id: 'scroll-1', hits: { hits: [] } }, + {} + ]) + + await runCommand( + ['--index', 'test-idx', '--query', '{}', '--json'], + makeDeps(transport, output) + ) + + const stderrText = output.stderr.chunks.join('') + assert.ok(stderrText.includes('1 documents'), `Expected summary in stderr, got: ${stderrText}`) + }) +}) From 47e4f5138faaff2b0f3c05e2ef563af52994eeb0 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Mon, 13 Apr 2026 15:16:48 -0400 Subject: [PATCH 3/3] Update scroll-search helper tests to use stderr Sending JSON-formatted errors to stderr happened in an upstream commit and broke the tests. --- test/es/helpers/scroll-search.test.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/test/es/helpers/scroll-search.test.ts b/test/es/helpers/scroll-search.test.ts index 6fcfad7a..4fd8361b 100644 --- a/test/es/helpers/scroll-search.test.ts +++ b/test/es/helpers/scroll-search.test.ts @@ -63,20 +63,31 @@ async function runCommand (args: string[], deps: ScrollSearchDeps): Promise<{ re program.addCommand(cmd) const progChunks: string[] = [] - const origWrite = process.stdout.write.bind(process.stdout) + const errChunks: string[] = [] + const origStdoutWrite = process.stdout.write.bind(process.stdout) + const origStderrWrite = process.stderr.write.bind(process.stderr) process.stdout.write = ((chunk: string) => { progChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) return true }) as typeof process.stdout.write + process.stderr.write = ((chunk: string) => { + errChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stderr.write try { await program.parseAsync(['node', 'test', 'scroll-search', ...args]) } finally { - process.stdout.write = origWrite + process.stdout.write = origStdoutWrite + process.stderr.write = origStderrWrite + process.exitCode = 0 } let result: unknown - const output = progChunks.join('') + // Prefer stderr (error results) over stdout; parse whichever has content + const errOutput = errChunks.join('') + const stdOutput = progChunks.join('') + const output = errOutput.trim().length > 0 ? errOutput : stdOutput if (output.trim().length > 0) { try { result = JSON.parse(output.trim()) } catch { result = output.trim() } }