From bdff87a274dc035e34922dd06eeaeb9add58638c Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 13 Apr 2026 11:28:25 +0200 Subject: [PATCH 1/3] feat: add helper commands foundation (errors, shared utils, registration) Extract error utilities into shared `src/es/errors.ts` so both the existing ES handler and new helper commands can reuse them. Add shared helper utilities (NDJSON parsing, bulk body builder, retry with backoff, concurrency runner, progress reporter, glob file discovery) and register an empty `helpers` group under the `es` command tree. --- src/es/errors.ts | 29 +++++ src/es/handler.ts | 23 +--- src/es/helpers/register.ts | 23 ++++ src/es/helpers/shared.ts | 166 ++++++++++++++++++++++++++ src/es/register.ts | 5 +- test/es/helpers/register.test.ts | 22 ++++ test/es/helpers/shared.test.ts | 192 +++++++++++++++++++++++++++++++ test/es/register.test.ts | 16 +-- 8 files changed, 446 insertions(+), 30 deletions(-) create mode 100644 src/es/errors.ts create mode 100644 src/es/helpers/register.ts create mode 100644 src/es/helpers/shared.ts create mode 100644 test/es/helpers/register.test.ts create mode 100644 test/es/helpers/shared.test.ts diff --git a/src/es/errors.ts b/src/es/errors.ts new file mode 100644 index 00000000..a0f90c79 --- /dev/null +++ b/src/es/errors.ts @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { errors } from '@elastic/transport' +import type { JsonValue } from '../factory.ts' + +/** Builds a `missing_config` error payload from a thrown error. */ +export function missingConfigError (err: unknown): JsonValue { + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'missing_config', message } } +} + +/** Builds a `transport_error` payload from a thrown transport error. */ +export function transportError (err: unknown): JsonValue { + if (err instanceof errors.ResponseError) { + return { + error: { + code: 'transport_error', + status_code: err.statusCode ?? null, + body: err.body as JsonValue ?? null + } + } + } + + const message = err instanceof Error ? err.message : String(err) + return { error: { code: 'transport_error', message } } +} diff --git a/src/es/handler.ts b/src/es/handler.ts index d6972466..d20e41c5 100644 --- a/src/es/handler.ts +++ b/src/es/handler.ts @@ -3,12 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { errors } from '@elastic/transport' import type { Transport } from '@elastic/transport' import type { EsApiDefinition } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { buildRequestParams } from './request-builder.ts' import { getTransport } from '../lib/transport.ts' +import { missingConfigError, transportError } from './errors.ts' import type { JsonValue, ParsedResult } from '../factory.ts' /** @@ -76,24 +76,3 @@ export function createEsHandler ( } } -/** builds a `missing_config` error payload from a thrown error */ -function missingConfigError (err: unknown): JsonValue { - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'missing_config', message } } -} - -/** builds a `transport_error` payload from a thrown transport error */ -function transportError (err: unknown): JsonValue { - if (err instanceof errors.ResponseError) { - return { - error: { - code: 'transport_error', - status_code: err.statusCode ?? null, - body: err.body as JsonValue ?? null - } - } - } - - const message = err instanceof Error ? err.message : String(err) - return { error: { code: 'transport_error', message } } -} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts new file mode 100644 index 00000000..097c31a2 --- /dev/null +++ b/src/es/helpers/register.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { defineGroup } from '../../factory.ts' +import type { OpaqueCommandHandle } from '../../factory.ts' + +/** + * Registers all high-level helper commands under a `helpers` group. + * Helper commands provide convenience abstractions over common Elasticsearch + * workflows (bulk ingestion, scroll search, multi-search batching). + * + * @returns an `OpaqueCommandHandle` for the `helpers` group + */ +export function registerHelperCommands (): OpaqueCommandHandle { + const commands: OpaqueCommandHandle[] = [] + + return defineGroup( + { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, + ...commands + ) +} diff --git a/src/es/helpers/shared.ts b/src/es/helpers/shared.ts new file mode 100644 index 00000000..3a7504d5 --- /dev/null +++ b/src/es/helpers/shared.ts @@ -0,0 +1,166 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFileSync, globSync } from 'node:fs' +import { resolve } from 'node:path' +import type { JsonValue } from '../../factory.ts' + +/** + * Parses raw text input as either a JSON array or NDJSON (newline-delimited JSON). + * Auto-detects the format: if the trimmed input starts with `[`, it's parsed as JSON array; + * otherwise each non-empty line is parsed as a separate JSON object. + */ +export function parseInput (raw: string): unknown[] { + const trimmed = raw.trim() + if (trimmed.length === 0) return [] + + if (trimmed.startsWith('[')) { + const parsed = JSON.parse(trimmed) + if (!Array.isArray(parsed)) { + throw new Error('Expected a JSON array, got: ' + typeof parsed) + } + return parsed + } + + const lines = trimmed.split('\n') + const docs: unknown[] = [] + for (let i = 0; i < lines.length; i++) { + const line = lines[i]!.trim() + if (line.length === 0) continue + try { + docs.push(JSON.parse(line)) + } catch { + throw new Error(`Failed to parse NDJSON at line ${i + 1}: ${line.slice(0, 80)}`) + } + } + return docs +} + +/** + * Reads raw text content from a file path or stdin. + * Returns `undefined` when no input is available (interactive TTY with no file). + */ +export function readRawInput (filePath?: string): string | undefined { + if (filePath != null) { + return readFileSync(filePath, 'utf-8') + } + if (!process.stdin.isTTY) { + const content = readFileSync(0, 'utf-8') + return content.length > 0 ? content : undefined + } + return undefined +} + +/** + * Resolves a glob pattern against a directory and returns a sorted list of absolute file paths. + * Uses Node.js native `fs.globSync` (available since Node 22). + */ +export function globFiles (dir: string, pattern: string): string[] { + const absDir = resolve(dir) + const matches = globSync(pattern, { cwd: absDir }) + return matches.map((f) => resolve(absDir, f)).sort() +} + +/** + * Builds an NDJSON body for the Elasticsearch `_bulk` API. + * Each document is wrapped in an `{"index": {...}}` action line followed by the document line. + */ +export function buildBulkNdjsonBody ( + docs: unknown[], + opts: { index?: string, pipeline?: string, routing?: string } +): string { + const lines: string[] = [] + for (const doc of docs) { + const action: Record = {} + if (opts.index != null) action._index = opts.index + if (opts.pipeline != null) action.pipeline = opts.pipeline + if (opts.routing != null) action.routing = opts.routing + lines.push(JSON.stringify({ index: action })) + lines.push(JSON.stringify(doc)) + } + // bulk API requires a trailing newline + return lines.join('\n') + '\n' +} + +/** + * Retries an async function with exponential backoff. + * On each failure the delay doubles. Rethrows the last error after exhausting retries. + */ +export async function retryWithBackoff ( + fn: () => Promise, + opts: { retries: number, delay: number } +): Promise { + let lastError: unknown + let currentDelay = opts.delay + for (let attempt = 0; attempt <= opts.retries; attempt++) { + try { + return await fn() + } catch (err) { + lastError = err + if (attempt < opts.retries) { + await new Promise((resolve) => setTimeout(resolve, currentDelay)) + currentDelay *= 2 + } + } + } + throw lastError +} + +/** + * Runs async tasks with a concurrency limit. + * Returns results in the same order as the input items. + */ +export async function runWithConcurrency ( + items: T[], + concurrency: number, + fn: (item: T, index: number) => Promise +): Promise { + const results: R[] = new Array(items.length) + let nextIndex = 0 + + async function worker (): Promise { + while (nextIndex < items.length) { + const i = nextIndex++ + results[i] = await fn(items[i]!, i) + } + } + + const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker()) + await Promise.all(workers) + return results +} + +/** Tracks bulk ingestion progress and writes updates to stderr. */ +export class ProgressReporter { + total = 0 + succeeded = 0 + failed = 0 + retries = 0 + filesProcessed = 0 + private readonly startTime = Date.now() + + report (batchSize: number, batchErrors: number, fileName?: string): void { + this.total += batchSize + this.succeeded += batchSize - batchErrors + this.failed += batchErrors + const prefix = fileName != null ? `[${fileName}] ` : '' + process.stderr.write( + `\r${prefix}Progress: ${this.succeeded} succeeded, ${this.failed} failed, ${this.total} total` + ) + } + + summary (): JsonValue { + const elapsed_ms = Date.now() - this.startTime + process.stderr.write('\n') + return { + total: this.total, + succeeded: this.succeeded, + failed: this.failed, + retries: this.retries, + elapsed_ms, + ...(this.filesProcessed > 0 && { files_processed: this.filesProcessed }) + } + } +} diff --git a/src/es/register.ts b/src/es/register.ts index 614fa091..00ff55bb 100644 --- a/src/es/register.ts +++ b/src/es/register.ts @@ -11,6 +11,7 @@ import { validateApiDefinition, resolveInput } from './types.ts' import type { SchemaArgDefinition } from '../lib/schema-args.ts' import { allApis } from './apis.ts' import { createEsHandler } from './handler.ts' +import { registerHelperCommands } from './helpers/register.ts' /** Builds a leaf command handle from a definition and its pre-computed schema args. */ function buildLeafHandle ( @@ -108,5 +109,7 @@ export function registerEsCommands ( rootHandles.push(buildLeafHandle(def, defSchemaArgs)) } - return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles) + const helpersGroup = registerHelperCommands() + + return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles, helpersGroup) } diff --git a/test/es/helpers/register.test.ts b/test/es/helpers/register.test.ts new file mode 100644 index 00000000..7ff5b2b9 --- /dev/null +++ b/test/es/helpers/register.test.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { Command } from 'commander' +import { registerHelperCommands } from '../../../src/es/helpers/register.ts' + +describe('registerHelperCommands', () => { + it('returns a command group named "helpers"', () => { + const group = registerHelperCommands() + assert.ok(group instanceof Command) + assert.equal(group.name(), 'helpers') + }) + + it('has a description', () => { + const group = registerHelperCommands() + assert.ok(group.description().length > 0) + }) +}) diff --git a/test/es/helpers/shared.test.ts b/test/es/helpers/shared.test.ts new file mode 100644 index 00000000..09b6e31b --- /dev/null +++ b/test/es/helpers/shared.test.ts @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it } from 'node:test' +import assert from 'node:assert/strict' +import { + parseInput, + buildBulkNdjsonBody, + retryWithBackoff, + runWithConcurrency, + ProgressReporter +} from '../../../src/es/helpers/shared.ts' + +describe('parseInput', () => { + it('parses a JSON array', () => { + const result = parseInput('[{"a":1},{"b":2}]') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('parses NDJSON', () => { + const result = parseInput('{"a":1}\n{"b":2}\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('skips empty lines in NDJSON', () => { + const result = parseInput('{"a":1}\n\n{"b":2}\n\n') + assert.deepStrictEqual(result, [{ a: 1 }, { b: 2 }]) + }) + + it('returns empty array for empty input', () => { + assert.deepStrictEqual(parseInput(''), []) + assert.deepStrictEqual(parseInput(' \n '), []) + }) + + it('parses a single JSON object as NDJSON', () => { + const result = parseInput('{"a":1}') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) + + it('throws on malformed NDJSON line', () => { + assert.throws(() => parseInput('{"a":1}\nnot json\n'), /Failed to parse NDJSON at line 2/) + }) + + it('handles JSON array with whitespace', () => { + const result = parseInput(' \n [{"a":1}] \n ') + assert.deepStrictEqual(result, [{ a: 1 }]) + }) +}) + +describe('buildBulkNdjsonBody', () => { + it('wraps documents in index actions', () => { + const body = buildBulkNdjsonBody([{ title: 'doc1' }, { title: 'doc2' }], { index: 'my-index' }) + const lines = body.split('\n') + assert.equal(lines.length, 5) // 4 content lines + trailing empty line + assert.deepStrictEqual(JSON.parse(lines[0]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[1]), { title: 'doc1' }) + assert.deepStrictEqual(JSON.parse(lines[2]), { index: { _index: 'my-index' } }) + assert.deepStrictEqual(JSON.parse(lines[3]), { title: 'doc2' }) + assert.equal(lines[4], '') // trailing newline + }) + + it('includes pipeline and routing in action metadata', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], { + index: 'idx', + pipeline: 'my-pipe', + routing: 'shard-1' + }) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { + index: { _index: 'idx', pipeline: 'my-pipe', routing: 'shard-1' } + }) + }) + + it('returns trailing newline for empty docs', () => { + const body = buildBulkNdjsonBody([], { index: 'idx' }) + assert.equal(body, '\n') + }) + + it('omits undefined metadata fields', () => { + const body = buildBulkNdjsonBody([{ a: 1 }], {}) + const action = JSON.parse(body.split('\n')[0]) + assert.deepStrictEqual(action, { index: {} }) + }) +}) + +describe('retryWithBackoff', () => { + it('returns result on first success', async () => { + const result = await retryWithBackoff(() => Promise.resolve(42), { retries: 3, delay: 1 }) + assert.equal(result, 42) + }) + + it('retries and succeeds', async () => { + let attempt = 0 + const result = await retryWithBackoff(() => { + attempt++ + if (attempt < 3) throw new Error('fail') + return Promise.resolve('ok') + }, { retries: 3, delay: 1 }) + assert.equal(result, 'ok') + assert.equal(attempt, 3) + }) + + it('throws after exhausting retries', async () => { + await assert.rejects( + () => retryWithBackoff(() => Promise.reject(new Error('always fail')), { retries: 2, delay: 1 }), + { message: 'always fail' } + ) + }) + + it('retries zero times when retries is 0', async () => { + let attempt = 0 + await assert.rejects(() => retryWithBackoff(() => { + attempt++ + return Promise.reject(new Error('fail')) + }, { retries: 0, delay: 1 })) + assert.equal(attempt, 1) + }) +}) + +describe('runWithConcurrency', () => { + it('processes all items and preserves order', async () => { + const results = await runWithConcurrency( + [1, 2, 3, 4, 5], + 2, + async (item) => item * 10 + ) + assert.deepStrictEqual(results, [10, 20, 30, 40, 50]) + }) + + it('limits concurrency', async () => { + let active = 0 + let maxActive = 0 + const results = await runWithConcurrency( + [1, 2, 3, 4], + 2, + async (item) => { + active++ + maxActive = Math.max(maxActive, active) + await new Promise((r) => setTimeout(r, 10)) + active-- + return item + } + ) + assert.deepStrictEqual(results, [1, 2, 3, 4]) + assert.ok(maxActive <= 2, `max concurrency was ${maxActive}, expected <= 2`) + }) + + it('handles empty input', async () => { + const results = await runWithConcurrency([], 5, async () => 'x') + assert.deepStrictEqual(results, []) + }) +}) + +describe('ProgressReporter', () => { + it('tracks counts correctly', () => { + const reporter = new ProgressReporter() + reporter.report(100, 5) + reporter.report(50, 0) + assert.equal(reporter.total, 150) + assert.equal(reporter.succeeded, 145) + assert.equal(reporter.failed, 5) + }) + + it('summary includes elapsed_ms', () => { + const reporter = new ProgressReporter() + reporter.report(10, 1) + reporter.retries = 2 + const summary = reporter.summary() as Record + assert.equal(summary.total, 10) + assert.equal(summary.succeeded, 9) + assert.equal(summary.failed, 1) + assert.equal(summary.retries, 2) + assert.equal(typeof summary.elapsed_ms, 'number') + }) + + it('summary includes files_processed when non-zero', () => { + const reporter = new ProgressReporter() + reporter.filesProcessed = 3 + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, 3) + }) + + it('summary omits files_processed when zero', () => { + const reporter = new ProgressReporter() + reporter.report(10, 0) + const summary = reporter.summary() as Record + assert.equal(summary.files_processed, undefined) + }) +}) diff --git a/test/es/register.test.ts b/test/es/register.test.ts index 90eb9b00..5e61528b 100644 --- a/test/es/register.test.ts +++ b/test/es/register.test.ts @@ -34,7 +34,7 @@ describe('registerEsCommands', () => { it('creates one child group per unique namespace', () => { const handle = registerEsCommands(testDefs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'indices']) + assert.deepEqual(groupNames, ['cat', 'helpers', 'indices']) }) it('each namespace group has leaf commands matching definition names', () => { @@ -62,9 +62,11 @@ describe('registerEsCommands', () => { it('works with a single namespace', () => { const defs: EsApiDefinition[] = [makeDef('health', 'cat'), makeDef('nodes', 'cat')] const handle = registerEsCommands(defs) - assert.equal(handle.commands.length, 1) - assert.equal(handle.commands[0]?.name(), 'cat') - assert.equal(handle.commands[0]?.commands.length, 2) + // 1 namespace group + 1 helpers group + assert.equal(handle.commands.length, 2) + const cat = handle.commands.find((c) => c.name() === 'cat') + assert.ok(cat != null) + assert.equal(cat.commands.length, 2) }) it('throws on duplicate command names within a namespace', () => { @@ -149,7 +151,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const defs: EsApiDefinition[] = [makeRootDef('bulk'), makeRootDef('search')] const handle = registerEsCommands(defs) const names = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(names, ['bulk', 'search']) + assert.deepEqual(names, ['bulk', 'helpers', 'search']) }) it('namespace-less and namespaced definitions coexist under `es`', () => { @@ -161,7 +163,7 @@ describe('registerEsCommands - namespace-less (root) definitions', () => { const handle = registerEsCommands(defs) // `cat` group and `search` leaf are both direct children of `es` const topNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(topNames, ['cat', 'search']) + assert.deepEqual(topNames, ['cat', 'helpers', 'search']) const cat = handle.commands.find((c) => c.name() === 'cat') assert.ok(cat != null) assert.deepEqual(cat.commands.map((c) => c.name()).sort(), ['health', 'indices']) @@ -211,7 +213,7 @@ describe('registerEsCommands - extensibility', () => { ] const handle = registerEsCommands(defs) const groupNames = handle.commands.map((c) => c.name()).sort() - assert.deepEqual(groupNames, ['cat', 'cluster']) + assert.deepEqual(groupNames, ['cat', 'cluster', 'helpers']) const cluster = handle.commands.find((c) => c.name() === 'cluster') assert.ok(cluster != null) assert.equal(cluster.commands.length, 2) From a797c053532544063476d04f2aea0787e8242f38 Mon Sep 17 00:00:00 2001 From: Florian Bernd Date: Mon, 13 Apr 2026 11:37:11 +0200 Subject: [PATCH 2/3] feat: add bulk-ingest helper command Add `elastic es helpers bulk-ingest` for convenient bulk document ingestion with automatic batching, concurrency control, and retries. Supports three input modes: single file (--input-file), directory with glob patterns (--input-dir --glob), or piped stdin. Accepts both JSON arrays and NDJSON format, auto-detected per file. --- src/es/helpers/bulk-ingest.ts | 240 ++++++++++++++++++++++++++ src/es/helpers/register.ts | 5 +- src/es/helpers/shared.ts | 2 +- test/es/helpers/bulk-ingest.test.ts | 253 ++++++++++++++++++++++++++++ 4 files changed, 496 insertions(+), 4 deletions(-) create mode 100644 src/es/helpers/bulk-ingest.ts create mode 100644 test/es/helpers/bulk-ingest.test.ts diff --git a/src/es/helpers/bulk-ingest.ts b/src/es/helpers/bulk-ingest.ts new file mode 100644 index 00000000..89ce693c --- /dev/null +++ b/src/es/helpers/bulk-ingest.ts @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFileSync } from 'node:fs' +import type { Transport } from '@elastic/transport' +import { defineCommand } from '../../factory.ts' +import type { OpaqueCommandHandle, JsonValue, ParsedResult } from '../../factory.ts' +import { getTransport } from '../../lib/transport.ts' +import { missingConfigError, transportError } from '../errors.ts' +import { + parseInput, + readRawInput, + globFiles, + buildBulkNdjsonBody, + retryWithBackoff, + runWithConcurrency, + ProgressReporter +} from './shared.ts' + +interface BulkIngestOptions { + index: string + 'input-file'?: string + 'input-dir'?: string + glob: string + 'no-recursive'?: boolean + 'flush-bytes': number + concurrency: number + retries: number + 'retry-delay': number + pipeline?: string + routing?: string +} + +/** Dependencies injectable for testing. */ +export interface BulkIngestDeps { + getTransport: () => Transport +} + +const defaultDeps: BulkIngestDeps = { getTransport } + +/** + * Splits an array of documents into batches where each batch's serialized + * size does not exceed the byte threshold. + */ +function splitIntoBatches (docs: unknown[], flushBytes: number): unknown[][] { + const batches: unknown[][] = [] + let currentBatch: unknown[] = [] + let currentSize = 0 + + for (const doc of docs) { + const docSize = JSON.stringify(doc).length + 1 // +1 for newline + if (currentBatch.length > 0 && currentSize + docSize > flushBytes) { + batches.push(currentBatch) + currentBatch = [] + currentSize = 0 + } + currentBatch.push(doc) + currentSize += docSize + } + if (currentBatch.length > 0) { + batches.push(currentBatch) + } + return batches +} + +/** Collects documents from the resolved input source. */ +function collectDocuments (opts: BulkIngestOptions): { docs: unknown[], filesProcessed: number } { + const inputFile = opts['input-file'] + const inputDir = opts['input-dir'] + + if (inputFile != null && inputDir != null) { + throw new Error('Provide only one input source: --input-file or --input-dir (not both)') + } + + if (inputDir != null) { + const recursive = opts['no-recursive'] !== true + const pattern = recursive ? opts.glob : opts.glob.replace(/^\*\*\//, '') + const files = globFiles(inputDir, pattern) + if (files.length === 0) { + throw new Error(`No files matched pattern "${opts.glob}" in ${inputDir}`) + } + const allDocs: unknown[] = [] + for (const file of files) { + const raw = readFileSync(file, 'utf-8') + allDocs.push(...parseInput(raw)) + } + return { docs: allDocs, filesProcessed: files.length } + } + + if (inputFile != null) { + const raw = readRawInput(inputFile) + if (raw == null || raw.trim().length === 0) { + throw new Error('No input data received from file') + } + return { docs: parseInput(raw), filesProcessed: 1 } + } + + // Fall back to stdin + const raw = readRawInput() + if (raw == null || raw.trim().length === 0) { + throw new Error('No input provided. Use --input-file, --input-dir, or pipe data to stdin') + } + return { docs: parseInput(raw), filesProcessed: 0 } +} + +/** Sends a single bulk batch to Elasticsearch. Returns the count of errors. */ +async function sendBatch ( + transport: Transport, + ndjsonBody: string, + index: string +): Promise<{ errors: number, total: number }> { + const path = index != null ? `/${encodeURIComponent(index)}/_bulk` : '/_bulk' + const result = await transport.request( + { method: 'POST', path, body: ndjsonBody, bulkBody: ndjsonBody }, + { headers: { 'content-type': 'application/x-ndjson' } } + ) as { errors?: boolean, items?: Array> } + + let errorCount = 0 + if (result.errors === true && result.items != null) { + for (const item of result.items) { + const action = Object.values(item)[0] + if (action != null && action.status != null && action.status >= 400) { + errorCount++ + } + } + } + const total = result.items?.length ?? 0 + return { errors: errorCount, total } +} + +function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) { + return async (parsed: ParsedResult): Promise => { + const opts = parsed.options as unknown as BulkIngestOptions + + let transport: Transport + try { + transport = deps.getTransport() + } catch (err) { + return missingConfigError(err) + } + + let docs: unknown[] + let filesProcessed: number + try { + const result = collectDocuments(opts) + docs = result.docs + filesProcessed = result.filesProcessed + } catch (err) { + return { + error: { + code: 'input_error', + message: err instanceof Error ? err.message : String(err) + } + } + } + + if (docs.length === 0) { + return { total: 0, succeeded: 0, failed: 0, retries: 0, elapsed_ms: 0 } + } + + const flushBytes = opts['flush-bytes'] + const batches = splitIntoBatches(docs, flushBytes) + + const reporter = new ProgressReporter() + reporter.filesProcessed = filesProcessed + + const retries = opts.retries + const retryDelay = opts['retry-delay'] + const index = opts.index + const pipeline = opts.pipeline + const routing = opts.routing + + try { + await runWithConcurrency(batches, opts.concurrency, async (batch) => { + const ndjsonBody = buildBulkNdjsonBody(batch, { index, pipeline, routing }) + + const result = await retryWithBackoff( + async () => { + const res = await sendBatch(transport, ndjsonBody, index) + if (res.errors > 0) { + // Only retry if all items failed (likely a transient cluster issue) + // Partial failures are reported as-is + if (res.errors === res.total) { + throw new Error(`Bulk batch failed: ${res.errors}/${res.total} errors`) + } + } + return res + }, + { retries, delay: retryDelay } + ) + + reporter.report(result.total, result.errors) + return result + }) + } catch (err) { + // If retries exhausted, report what we have so far + return transportError(err) + } + + return reporter.summary() + } +} + +export function createBulkIngestCommand (deps?: BulkIngestDeps): OpaqueCommandHandle { + return defineCommand({ + name: 'bulk-ingest', + description: 'Bulk-ingest documents from file, directory, or stdin with automatic batching, concurrency, and retries.', + options: [ + { long: 'index', short: 'i', description: 'Target index', type: 'string', required: true }, + { long: 'input-file', description: 'Path to data file (NDJSON or JSON array)', type: 'string' }, + { long: 'input-dir', description: 'Path to directory of data files to ingest', type: 'string' }, + { long: 'glob', description: 'Glob pattern for --input-dir file matching', type: 'string', defaultValue: '**/*.json' }, + { long: 'no-recursive', description: 'Do not recurse into subdirectories when using --input-dir', type: 'boolean' }, + { long: 'flush-bytes', description: 'Batch size threshold in bytes', type: 'number', defaultValue: 5242880 }, + { long: 'concurrency', description: 'Number of parallel bulk requests', type: 'number', defaultValue: 5 }, + { long: 'retries', description: 'Max retries per failed batch', type: 'number', defaultValue: 3 }, + { long: 'retry-delay', description: 'Initial retry delay in ms (doubles each attempt)', type: 'number', defaultValue: 1000 }, + { long: 'pipeline', description: 'Ingest pipeline name', type: 'string' }, + { long: 'routing', description: 'Custom routing value', type: 'string' }, + ], + handler: createBulkIngestHandler(deps), + formatOutput: (result) => { + const r = result as Record + if (r.error != null) return JSON.stringify(result, null, 2) + '\n' + const lines = [ + `Total: ${r.total}`, + `Succeeded: ${r.succeeded}`, + `Failed: ${r.failed}`, + `Retries: ${r.retries}`, + `Elapsed: ${r.elapsed_ms}ms`, + ] + if (r.files_processed != null) { + lines.push(`Files: ${r.files_processed}`) + } + return lines.join('\n') + '\n' + } + }) +} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts index 097c31a2..6326d3ff 100644 --- a/src/es/helpers/register.ts +++ b/src/es/helpers/register.ts @@ -5,6 +5,7 @@ import { defineGroup } from '../../factory.ts' import type { OpaqueCommandHandle } from '../../factory.ts' +import { createBulkIngestCommand } from './bulk-ingest.ts' /** * Registers all high-level helper commands under a `helpers` group. @@ -14,10 +15,8 @@ import type { OpaqueCommandHandle } from '../../factory.ts' * @returns an `OpaqueCommandHandle` for the `helpers` group */ export function registerHelperCommands (): OpaqueCommandHandle { - const commands: OpaqueCommandHandle[] = [] - return defineGroup( { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, - ...commands + createBulkIngestCommand() ) } diff --git a/src/es/helpers/shared.ts b/src/es/helpers/shared.ts index 3a7504d5..4736ca7a 100644 --- a/src/es/helpers/shared.ts +++ b/src/es/helpers/shared.ts @@ -69,7 +69,7 @@ export function globFiles (dir: string, pattern: string): string[] { */ export function buildBulkNdjsonBody ( docs: unknown[], - opts: { index?: string, pipeline?: string, routing?: string } + opts: { index?: string | undefined, pipeline?: string | undefined, routing?: string | undefined } ): string { const lines: string[] = [] for (const doc of docs) { diff --git a/test/es/helpers/bulk-ingest.test.ts b/test/es/helpers/bulk-ingest.test.ts new file mode 100644 index 00000000..c225a153 --- /dev/null +++ b/test/es/helpers/bulk-ingest.test.ts @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, afterEach } from 'node:test' +import assert from 'node:assert/strict' +import { mkdtempSync, writeFileSync, mkdirSync } from 'node:fs' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import type { Transport, TransportRequestParams, TransportRequestOptions } from '@elastic/transport' +import { createBulkIngestCommand } from '../../../src/es/helpers/bulk-ingest.ts' +import type { BulkIngestDeps } from '../../../src/es/helpers/bulk-ingest.ts' +import { _testSetStdinReader } from '../../../src/factory.ts' +import { Command } from 'commander' + +/** Creates a mock transport that records requests and returns configurable responses. */ +function mockTransport (responses: Array<{ errors: boolean, items: Array> }>): { + transport: Transport + requests: Array<{ params: TransportRequestParams, opts?: TransportRequestOptions }> +} { + const requests: Array<{ params: TransportRequestParams, opts?: TransportRequestOptions }> = [] + let callIndex = 0 + const transport = { + request: async (params: TransportRequestParams, opts?: TransportRequestOptions) => { + requests.push({ params, opts }) + const response = responses[callIndex] ?? responses[responses.length - 1] + callIndex++ + return response + } + } as unknown as Transport + return { transport, requests } +} + +function makeDeps (transport: Transport): BulkIngestDeps { + return { getTransport: () => transport } +} + +function successResponse (count: number): { errors: boolean, items: Array> } { + return { + errors: false, + items: Array.from({ length: count }, () => ({ index: { status: 201 } })) + } +} + +/** Runs the bulk-ingest command programmatically and returns handler result. */ +async function runCommand (args: string[], deps: BulkIngestDeps): Promise { + const cmd = createBulkIngestCommand(deps) + const program = new Command() + program.exitOverride() + program.option('--json', 'output as JSON') + program.addCommand(cmd) + + // Capture output + const origWrite = process.stdout.write.bind(process.stdout) + const chunks: string[] = [] + process.stdout.write = ((chunk: string) => { + chunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stdout.write + + try { + await program.parseAsync(['node', 'test', 'bulk-ingest', ...args]) + } finally { + process.stdout.write = origWrite + } + + // Parse the captured JSON output + const output = chunks.join('') + if (output.trim().length > 0) { + try { + return JSON.parse(output.trim()) + } catch { + return output.trim() + } + } + return undefined +} + +describe('bulk-ingest command', () => { + let restoreStdin: (() => void) | undefined + + afterEach(() => { + if (restoreStdin != null) { + restoreStdin() + restoreStdin = undefined + } + }) + + it('creates a command named bulk-ingest', () => { + const { transport } = mockTransport([successResponse(1)]) + const cmd = createBulkIngestCommand(makeDeps(transport)) + assert.equal(cmd.name(), 'bulk-ingest') + }) + + it('ingests documents from --input-file with JSON array', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const filePath = join(tmpDir, 'data.json') + writeFileSync(filePath, JSON.stringify([{ title: 'doc1' }, { title: 'doc2' }])) + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand(['--index', 'test-idx', '--input-file', filePath, '--json'], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"index"')) + assert.ok(body.includes('"doc1"')) + assert.ok(body.includes('"doc2"')) + }) + + it('ingests documents from --input-file with NDJSON', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"title":"doc1"}\n{"title":"doc2"}\n') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand(['--index', 'test-idx', '--input-file', filePath, '--json'], makeDeps(transport)) + + assert.equal(requests.length, 1) + }) + + it('ingests documents from --input-dir', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'a.json'), '[{"a":1}]') + writeFileSync(join(tmpDir, 'b.json'), '[{"b":2}]') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--index', 'test-idx', + '--input-dir', tmpDir, + '--glob', '*.json', + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"a"')) + assert.ok(body.includes('"b"')) + }) + + it('recurses into subdirectories by default', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + mkdirSync(join(tmpDir, 'sub')) + writeFileSync(join(tmpDir, 'top.json'), '[{"top":1}]') + writeFileSync(join(tmpDir, 'sub', 'nested.json'), '[{"nested":2}]') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--index', 'test-idx', + '--input-dir', tmpDir, + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"top"')) + assert.ok(body.includes('"nested"')) + }) + + it('splits large inputs into multiple batches', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const docs = Array.from({ length: 100 }, (_, i) => ({ id: i, data: 'x'.repeat(100) })) + writeFileSync(join(tmpDir, 'data.json'), JSON.stringify(docs)) + + // Use a small flush-bytes to force multiple batches + const { transport, requests } = mockTransport( + Array.from({ length: 100 }, () => successResponse(1)) + ) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--flush-bytes', '500', + '--json' + ], makeDeps(transport)) + + assert.ok(requests.length > 1, `Expected multiple batches, got ${requests.length}`) + }) + + it('includes pipeline and routing in bulk actions', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--pipeline', 'my-pipe', + '--routing', 'shard-1', + '--json' + ], makeDeps(transport)) + + const body = requests[0]!.params.body as string + const actionLine = JSON.parse(body.split('\n')[0]!) + assert.equal(actionLine.index.pipeline, 'my-pipe') + assert.equal(actionLine.index.routing, 'shard-1') + }) + + it('returns missing_config error when transport is not configured', async () => { + const deps: BulkIngestDeps = { + getTransport: () => { throw new Error('missing_config: no ES configured') } + } + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const result = await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], deps) as Record + + const error = result.error as Record + assert.equal(error.code, 'missing_config') + }) + + it('sends content-type application/x-ndjson header', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], makeDeps(transport)) + + const opts = requests[0]!.opts as Record + const headers = opts.headers as Record + assert.equal(headers['content-type'], 'application/x-ndjson') + }) + + it('returns empty summary for zero documents', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[]') + + const { transport } = mockTransport([successResponse(0)]) + + const result = await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], makeDeps(transport)) as Record + + assert.equal(result.total, 0) + assert.equal(result.succeeded, 0) + }) +}) From d6db8f410fa319fa9b3e62be4f9e45e8c4ceae36 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Mon, 13 Apr 2026 14:59:56 -0400 Subject: [PATCH 3/3] Update bulk-ingest tests to watch stderr Sending JSON-formatted errors to stderr happened in an upstream commit. --- test/es/helpers/bulk-ingest.test.ts | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/test/es/helpers/bulk-ingest.test.ts b/test/es/helpers/bulk-ingest.test.ts index c225a153..98657fc8 100644 --- a/test/es/helpers/bulk-ingest.test.ts +++ b/test/es/helpers/bulk-ingest.test.ts @@ -51,22 +51,32 @@ async function runCommand (args: string[], deps: BulkIngestDeps): Promise { - chunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + stdoutChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) return true }) as typeof process.stdout.write + process.stderr.write = ((chunk: string) => { + stderrChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stderr.write try { await program.parseAsync(['node', 'test', 'bulk-ingest', ...args]) } finally { - process.stdout.write = origWrite + process.stdout.write = origStdoutWrite + process.stderr.write = origStderrWrite + process.exitCode = 0 } - // Parse the captured JSON output - const output = chunks.join('') + // Prefer stderr (error results) over stdout; parse whichever has content + const errOutput = stderrChunks.join('') + const stdOutput = stdoutChunks.join('') + const output = errOutput.trim().length > 0 ? errOutput : stdOutput if (output.trim().length > 0) { try { return JSON.parse(output.trim())