From bdff87a274dc035e34922dd06eeaeb9add58638c Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 13 Apr 2026 11:28:25 +0200 Subject: [PATCH] feat: add helper commands foundation (errors, shared utils, registration) Extract error utilities into shared `src/es/errors.ts` so both the existing ES handler and new helper commands can reuse them. Add shared helper utilities (NDJSON parsing, bulk body builder, retry with backoff, concurrency runner, progress reporter, glob file discovery) and register an empty `helpers` group under the `es` command tree. --- src/es/errors.ts | 29 +++++ src/es/handler.ts | 23 +--- src/es/helpers/register.ts | 23 ++++ src/es/helpers/shared.ts | 166 ++++++++++++++++++++++++++ src/es/register.ts | 5 +- test/es/helpers/register.test.ts | 22 ++++ test/es/helpers/shared.test.ts | 192 +++++++++++++++++++++++++++++++ test/es/register.test.ts | 16 +-- 8 files changed, 446 insertions(+), 30 deletions(-) create mode 100644 src/es/errors.ts create mode 100644 src/es/helpers/register.ts create mode 100644 src/es/helpers/shared.ts create mode 100644 test/es/helpers/register.test.ts create mode 100644 test/es/helpers/shared.test.ts diff --git a/src/es/errors.ts b/src/es/errors.ts new file mode 100644 index 00000000..a0f90c79 --- /dev/null +++ b/src/es/errors.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { errors } from '@elastic/transport' +import type { JsonValue } from '../factory.ts' + +/** Builds a `missing_config` error payload from a thrown error. */ +export function missingConfigError (err: unknown): JsonValue { + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'missing_config', message } } +} + +/** Builds a `transport_error` payload from a thrown transport error. */ +export function transportError (err: unknown): JsonValue { + if (err instanceof errors.ResponseError) { + return { + error: { + code: 'transport_error', + status_code: err.statusCode ?? null, + body: err.body as JsonValue ?? null + } + } + } + + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'transport_error', message } } +} diff --git a/src/es/handler.ts b/src/es/handler.ts index d6972466..d20e41c5 100644 --- a/src/es/handler.ts +++ b/src/es/handler.ts @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { errors } from '@elastic/transport' import type { Transport } from '@elastic/transport' import type { EsApiDefinition } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { buildRequestParams } from './request-builder.ts' import { getTransport } from '../lib/transport.ts' +import { missingConfigError, transportError } from './errors.ts' import type { JsonValue, ParsedResult } from '../factory.ts' /** @@ -76,24 +76,3 @@ export function createEsHandler ( } } -/** builds a `missing_config` error payload from a thrown error */ -function missingConfigError (err: unknown): JsonValue { - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'missing_config', message } } -} - -/** builds a `transport_error` payload from a thrown transport error */ -function transportError (err: unknown): JsonValue { - if (err instanceof errors.ResponseError) { - return { - error: { - code: 'transport_error', - status_code: err.statusCode ?? null, - body: err.body as JsonValue ?? null - } - } - } - - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'transport_error', message } } -} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts new file mode 100644 index 00000000..097c31a2 --- /dev/null +++ b/src/es/helpers/register.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { defineGroup } from '../../factory.ts' +import type { OpaqueCommandHandle } from '../../factory.ts' + +/** + * Registers all high-level helper commands under a `helpers` group. + * Helper commands provide convenience abstractions over common Elasticsearch + * workflows (bulk ingestion, scroll search, multi-search batching). + * + * @returns an `OpaqueCommandHandle` for the `helpers` group + */ +export function registerHelperCommands (): OpaqueCommandHandle { + const commands: OpaqueCommandHandle[] = [] + + return defineGroup( + { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, + ...commands + ) +} diff --git a/src/es/helpers/shared.ts b/src/es/helpers/shared.ts new file mode 100644 index 00000000..3a7504d5 --- /dev/null +++ b/src/es/helpers/shared.ts @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFileSync, globSync } from 'node:fs' +import { resolve } from 'node:path' +import type { JsonValue } from '../../factory.ts' + +/** + * Parses raw text input as either a JSON array or NDJSON (newline-delimited JSON). + * Auto-detects the format: if the trimmed input starts with `[`, it's parsed as JSON array; + * otherwise each non-empty line is parsed as a separate JSON object. + */ +export function parseInput (raw: string): unknown[] { + const trimmed = raw.trim() + if (trimmed.length === 0) return [] + + if (trimmed.startsWith('[')) { + const parsed = JSON.parse(trimmed) + if (!Array.isArray(parsed)) { + throw new Error('Expected a JSON array, got: ' + typeof parsed) + } + return parsed + } + + const lines = trimmed.split('\n') + const docs: unknown[] = [] + for (let i = 0; i < lines.length; i++) { + const line = lines[i]!.trim() + if (line.length === 0) continue + try { + docs.push(JSON.parse(line)) + } catch { + throw new Error(`Failed to parse NDJSON at line ${i + 1}: ${line.slice(0, 80)}`) + } + } + return docs +} + +/** + * Reads raw text content from a file path or stdin. + * Returns `undefined` when no input is available (interactive TTY with no file). + */ +export function readRawInput (filePath?: string): string | undefined { + if (filePath != null) { + return readFileSync(filePath, 'utf-8') + } + if (!process.stdin.isTTY) { + const content = readFileSync(0, 'utf-8') + return content.length > 0 ? content : undefined + } + return undefined +} + +/** + * Resolves a glob pattern against a directory and returns a sorted list of absolute file paths. + * Uses Node.js native `fs.globSync` (available since Node 22). + */ +export function globFiles (dir: string, pattern: string): string[] { + const absDir = resolve(dir) + const matches = globSync(pattern, { cwd: absDir }) + return matches.map((f) => resolve(absDir, f)).sort() +} + +/** + * Builds an NDJSON body for the Elasticsearch `_bulk` API. + * Each document is wrapped in an `{"index": {...}}` action line followed by the document line. + */ +export function buildBulkNdjsonBody ( + docs: unknown[], + opts: { index?: string, pipeline?: string, routing?: string } +): string { + const lines: string[] = [] + for (const doc of docs) { + const action: Record = {} + if (opts.index != null) action._index = opts.index + if (opts.pipeline != null) action.pipeline = opts.pipeline + if (opts.routing != null) action.routing = opts.routing + lines.push(JSON.stringify({ index: action })) + lines.push(JSON.stringify(doc)) + } + // bulk API requires a trailing newline + return lines.join('\n') + '\n' +} + +/** + * Retries an async function with exponential backoff. + * On each failure the delay doubles. Rethrows the last error after exhausting retries. + */ +export async function retryWithBackoff ( + fn: () => Promise, + opts: { retries: number, delay: number } +): Promise { + let lastError: unknown + let currentDelay = opts.delay + for (let attempt = 0; attempt <= opts.retries; attempt++) { + try { + return await fn() + } catch (err) { + lastError = err + if (attempt < opts.retries) { + await new Promise((resolve) => setTimeout(resolve, currentDelay)) + currentDelay *= 2 + } + } + } + throw lastError +} + +/** + * Runs async tasks with a concurrency limit. + * Returns results in the same order as the input items. + */ +export async function runWithConcurrency ( + items: T[], + concurrency: number, + fn: (item: T, index: number) => Promise +): Promise { + const results: R[] = new Array(items.length) + let nextIndex = 0 + + async function worker (): Promise { + while (nextIndex < items.length) { + const i = nextIndex++ + results[i] = await fn(items[i]!, i) + } + } + + const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker()) + await Promise.all(workers) + return results +} + +/** Tracks bulk ingestion progress and writes updates to stderr. */ +export class ProgressReporter { + total = 0 + succeeded = 0 + failed = 0 + retries = 0 + filesProcessed = 0 + private readonly startTime = Date.now() + + report (batchSize: number, batchErrors: number, fileName?: string): void { + this.total += batchSize + this.succeeded += batchSize - batchErrors + this.failed += batchErrors + const prefix = fileName != null ? `[${fileName}] ` : '' + process.stderr.write( + `\r${prefix}Progress: ${this.succeeded} succeeded, ${this.failed} failed, ${this.total} total` + ) + } + + summary (): JsonValue { + const elapsed_ms = Date.now() - this.startTime + process.stderr.write('\n') + return { + total: this.total, + succeeded: this.succeeded, + failed: this.failed, + retries: this.retries, + elapsed_ms, + ...(this.filesProcessed > 0 && { files_processed: this.filesProcessed }) + } + } +} diff --git a/src/es/register.ts b/src/es/register.ts index 614fa091..00ff55bb 100644 --- a/src/es/register.ts +++ b/src/es/register.ts @@ -11,6 +11,7 @@ import { validateApiDefinition, resolveInput } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { allApis } from './apis.ts' import { createEsHandler } from './handler.ts' +import { registerHelperCommands } from './helpers/register.ts' /** Builds a leaf command handle from a definition and its pre-computed schema args. */ function buildLeafHandle ( @@ -108,5 +109,7 @@ export function registerEsCommands ( rootHandles.push(buildLeafHandle(def, defSchemaArgs)) } - return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles) + const helpersGroup = registerHelperCommands() + + return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles, helpersGroup) } diff --git a/test/es/helpers/register.test.ts b/test/es/helpers/register.test.ts new file mode 100644 index 00000000..7ff5b2b9 --- /dev/null +++ b/test/es/helpers/register.test.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { Command } from 'commander' +import { registerHelperCommands } from '../../../src/es/helpers/register.ts' + +describe('registerHelperCommands', () => { + it('returns a command group named "helpers"', () => { + const group = registerHelperCommands() + assert.ok(group instanceof Command) + assert.equal(group.name(), 'helpers') + }) + + it('has a description', () => { + const group = registerHelperCommands() + assert.ok(group.description().length > 0) + }) +}) diff --git a/test/es/helpers/shared.test.ts b/test/es/helpers/shared.test.ts new file mode 100644 index 00000000..09b6e31b --- /dev/null +++ b/test/es/helpers/shared.test.ts @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { + parseInput, + buildBulkNdjsonBody, + retryWithBackoff, + runWithConcurrency, + ProgressReporter +} from '../../../src/es/helpers/shared.ts' + +describe('parseInput', () => { + it('parses a JSON array', () => { + const result = parseInput('[{"a":1},{"b":2}]') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('parses NDJSON', () => { + const result = parseInput('{"a":1}\n{"b":2}\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('skips empty lines in NDJSON', () => { + const result = parseInput('{"a":1}\n\n{"b":2}\n\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('returns empty array for empty input', () => { + assert.deepStrictEqual(parseInput(''), []) + assert.deepStrictEqual(parseInput(' \n '), []) + }) + + it('parses a single JSON object as NDJSON', () => { + const result = parseInput('{"a":1}') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) + + it('throws on malformed NDJSON line', () => { + assert.throws(() => parseInput('{"a":1}\nnot json\n'), /Failed to parse NDJSON at line 2/) + }) + + it('handles JSON array with whitespace', () => { + const result = parseInput(' \n [{"a":1}] \n ') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) +}) + +describe('buildBulkNdjsonBody', () => { + it('wraps documents in index actions', () => { + const body = buildBulkNdjsonBody([{ title: 'doc1' }, { title: 'doc2' }], { index: 'my-index' }) + const lines = body.split('\n') + assert.equal(lines.length, 5) // 4 content lines + trailing empty line + assert.deepStrictEqual(JSON.parse(lines[0]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[1]), { title: 'doc1' }) + assert.deepStrictEqual(JSON.parse(lines[2]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[3]), { title: 'doc2' }) + assert.equal(lines[4], '') // trailing newline + }) + + it('includes pipeline and routing in action metadata', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], { + index: 'idx', + pipeline: 'my-pipe', + routing: 'shard-1' + }) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { + index: { _index: 'idx', pipeline: 'my-pipe', routing: 'shard-1' } + }) + }) + + it('returns trailing newline for empty docs', () => { + const body = buildBulkNdjsonBody([], { index: 'idx' }) + assert.equal(body, '\n') + }) + + it('omits undefined metadata fields', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], {}) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { index: {} }) + }) +}) + +describe('retryWithBackoff', () => { + it('returns result on first success', async () => { + const result = await retryWithBackoff(() => Promise.resolve(42), { retries: 3, delay: 1 }) + assert.equal(result, 42) + }) + + it('retries and succeeds', async () => { + let attempt = 0 + const result = await retryWithBackoff(() => { + attempt++ + if (attempt < 3) throw new Error('fail') + return Promise.resolve('ok') + }, { retries: 3, delay: 1 }) + assert.equal(result, 'ok') + assert.equal(attempt, 3) + }) + + it('throws after exhausting retries', async () => { + await assert.rejects( + () => retryWithBackoff(() => Promise.reject(new Error('always fail')), { retries: 2, delay: 1 }), + { message: 'always fail' } + ) + }) + + it('retries zero times when retries is 0', async () => { + let attempt = 0 + await assert.rejects(() => retryWithBackoff(() => { + attempt++ + return Promise.reject(new Error('fail')) + }, { retries: 0, delay: 1 })) + assert.equal(attempt, 1) + }) +}) + +describe('runWithConcurrency', () => { + it('processes all items and preserves order', async () => { + const results = await runWithConcurrency( + [1, 2, 3, 4, 5], + 2, + async (item) => item * 10 + ) + assert.deepStrictEqual(results, [10, 20, 30, 40, 50]) + }) + + it('limits concurrency', async () => { + let active = 0 + let maxActive = 0 + const results = await runWithConcurrency( + [1, 2, 3, 4], + 2, + async (item) => { + active++ + maxActive = Math.max(maxActive, active) + await new Promise((r) => setTimeout(r, 10)) + active-- + return item + } + ) + assert.deepStrictEqual(results, [1, 2, 3, 4]) + assert.ok(maxActive <= 2, `max concurrency was ${maxActive}, expected <= 2`) + }) + + it('handles empty input', async () => { + const results = await runWithConcurrency([], 5, async () => 'x') + assert.deepStrictEqual(results, []) + }) +}) + +describe('ProgressReporter', () => { + it('tracks counts correctly', () => { + const reporter = new ProgressReporter() + reporter.report(100, 5) + reporter.report(50, 0) + assert.equal(reporter.total, 150) + assert.equal(reporter.succeeded, 145) + assert.equal(reporter.failed, 5) + }) + + it('summary includes elapsed_ms', () => { + const reporter = new ProgressReporter() + reporter.report(10, 1) + reporter.retries = 2 + const summary = reporter.summary() as Record + assert.equal(summary.total, 10) + assert.equal(summary.succeeded, 9) + assert.equal(summary.failed, 1) + assert.equal(summary.retries, 2) + assert.equal(typeof summary.elapsed_ms, 'number') + }) + + it('summary includes files_processed when non-zero', () => { + const reporter = new ProgressReporter() + reporter.filesProcessed = 3 + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, 3) + }) + + it('summary omits files_processed when zero', () => { + const reporter = new ProgressReporter() + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, undefined) + }) +}) diff --git a/test/es/register.test.ts b/test/es/register.test.ts index 90eb9b00..5e61528b 100644 --- a/test/es/register.test.ts +++ b/test/es/register.test.ts @@ -34,7 +34,7 @@ describe('registerEsCommands', () => { it('creates one child group per unique namespace', () => { const handle = registerEsCommands(testDefs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'indices']) + assert.deepEqual(groupNames, ['cat', 'helpers', 'indices']) }) it('each namespace group has leaf commands matching definition names', () => { @@ -62,9 +62,11 @@ describe('registerEsCommands', () => { it('works with a single namespace', () => { const defs: EsApiDefinition[] = [makeDef('health', 'cat'), makeDef('nodes', 'cat')] const handle = registerEsCommands(defs) - assert.equal(handle.commands.length, 1) - assert.equal(handle.commands[0]?.name(), 'cat') - assert.equal(handle.commands[0]?.commands.length, 2) + // 1 namespace group + 1 helpers group + assert.equal(handle.commands.length, 2) + const cat = handle.commands.find((c) => c.name() === 'cat') + assert.ok(cat != null) + assert.equal(cat.commands.length, 2) }) it('throws on duplicate command names within a namespace', () => { @@ -149,7 +151,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const defs: EsApiDefinition[] = [makeRootDef('bulk'), makeRootDef('search')] const handle = registerEsCommands(defs) const names = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(names, ['bulk', 'search']) + assert.deepEqual(names, ['bulk', 'helpers', 'search']) }) it('namespace-less and namespaced definitions coexist under `es`', () => { @@ -161,7 +163,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const handle = registerEsCommands(defs) // `cat` group and `search` leaf are both direct children of `es` const topNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(topNames, ['cat', 'search']) + assert.deepEqual(topNames, ['cat', 'helpers', 'search']) const cat = handle.commands.find((c) => c.name() === 'cat') assert.ok(cat != null) assert.deepEqual(cat.commands.map((c) => c.name()).sort(), ['health', 'indices']) @@ -211,7 +213,7 @@ describe('registerEsCommands - extensibility', () => { ] const handle = registerEsCommands(defs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'cluster']) + assert.deepEqual(groupNames, ['cat', 'cluster', 'helpers']) const cluster = handle.commands.find((c) => c.name() === 'cluster') assert.ok(cluster != null) assert.equal(cluster.commands.length, 2)