diff --git a/src/es/helpers/bulk-ingest.ts b/src/es/helpers/bulk-ingest.ts new file mode 100644 index 00000000..89ce693c --- /dev/null +++ b/src/es/helpers/bulk-ingest.ts @@ -0,0 +1,240 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { readFileSync } from 'node:fs' +import type { Transport } from '@elastic/transport' +import { defineCommand } from '../../factory.ts' +import type { OpaqueCommandHandle, JsonValue, ParsedResult } from '../../factory.ts' +import { getTransport } from '../../lib/transport.ts' +import { missingConfigError, transportError } from '../errors.ts' +import { + parseInput, + readRawInput, + globFiles, + buildBulkNdjsonBody, + retryWithBackoff, + runWithConcurrency, + ProgressReporter +} from './shared.ts' + +interface BulkIngestOptions { + index: string + 'input-file'?: string + 'input-dir'?: string + glob: string + 'no-recursive'?: boolean + 'flush-bytes': number + concurrency: number + retries: number + 'retry-delay': number + pipeline?: string + routing?: string +} + +/** Dependencies injectable for testing. */ +export interface BulkIngestDeps { + getTransport: () => Transport +} + +const defaultDeps: BulkIngestDeps = { getTransport } + +/** + * Splits an array of documents into batches where each batch's serialized + * size does not exceed the byte threshold. + */ +function splitIntoBatches (docs: unknown[], flushBytes: number): unknown[][] { + const batches: unknown[][] = [] + let currentBatch: unknown[] = [] + let currentSize = 0 + + for (const doc of docs) { + const docSize = JSON.stringify(doc).length + 1 // +1 for newline + if (currentBatch.length > 0 && currentSize + docSize > flushBytes) { + batches.push(currentBatch) + currentBatch = [] + currentSize = 0 + } + currentBatch.push(doc) + currentSize += docSize + } + if (currentBatch.length > 0) { + batches.push(currentBatch) + } + return batches +} + +/** Collects documents from the resolved input source. */ +function collectDocuments (opts: BulkIngestOptions): { docs: unknown[], filesProcessed: number } { + const inputFile = opts['input-file'] + const inputDir = opts['input-dir'] + + if (inputFile != null && inputDir != null) { + throw new Error('Provide only one input source: --input-file or --input-dir (not both)') + } + + if (inputDir != null) { + const recursive = opts['no-recursive'] !== true + const pattern = recursive ? opts.glob : opts.glob.replace(/^\*\*\//, '') + const files = globFiles(inputDir, pattern) + if (files.length === 0) { + throw new Error(`No files matched pattern "${opts.glob}" in ${inputDir}`) + } + const allDocs: unknown[] = [] + for (const file of files) { + const raw = readFileSync(file, 'utf-8') + allDocs.push(...parseInput(raw)) + } + return { docs: allDocs, filesProcessed: files.length } + } + + if (inputFile != null) { + const raw = readRawInput(inputFile) + if (raw == null || raw.trim().length === 0) { + throw new Error('No input data received from file') + } + return { docs: parseInput(raw), filesProcessed: 1 } + } + + // Fall back to stdin + const raw = readRawInput() + if (raw == null || raw.trim().length === 0) { + throw new Error('No input provided. Use --input-file, --input-dir, or pipe data to stdin') + } + return { docs: parseInput(raw), filesProcessed: 0 } +} + +/** Sends a single bulk batch to Elasticsearch. Returns the count of errors. */ +async function sendBatch ( + transport: Transport, + ndjsonBody: string, + index: string +): Promise<{ errors: number, total: number }> { + const path = index != null ? `/${encodeURIComponent(index)}/_bulk` : '/_bulk' + const result = await transport.request( + { method: 'POST', path, body: ndjsonBody, bulkBody: ndjsonBody }, + { headers: { 'content-type': 'application/x-ndjson' } } + ) as { errors?: boolean, items?: Array> } + + let errorCount = 0 + if (result.errors === true && result.items != null) { + for (const item of result.items) { + const action = Object.values(item)[0] + if (action != null && action.status != null && action.status >= 400) { + errorCount++ + } + } + } + const total = result.items?.length ?? 0 + return { errors: errorCount, total } +} + +function createBulkIngestHandler (deps: BulkIngestDeps = defaultDeps) { + return async (parsed: ParsedResult): Promise => { + const opts = parsed.options as unknown as BulkIngestOptions + + let transport: Transport + try { + transport = deps.getTransport() + } catch (err) { + return missingConfigError(err) + } + + let docs: unknown[] + let filesProcessed: number + try { + const result = collectDocuments(opts) + docs = result.docs + filesProcessed = result.filesProcessed + } catch (err) { + return { + error: { + code: 'input_error', + message: err instanceof Error ? err.message : String(err) + } + } + } + + if (docs.length === 0) { + return { total: 0, succeeded: 0, failed: 0, retries: 0, elapsed_ms: 0 } + } + + const flushBytes = opts['flush-bytes'] + const batches = splitIntoBatches(docs, flushBytes) + + const reporter = new ProgressReporter() + reporter.filesProcessed = filesProcessed + + const retries = opts.retries + const retryDelay = opts['retry-delay'] + const index = opts.index + const pipeline = opts.pipeline + const routing = opts.routing + + try { + await runWithConcurrency(batches, opts.concurrency, async (batch) => { + const ndjsonBody = buildBulkNdjsonBody(batch, { index, pipeline, routing }) + + const result = await retryWithBackoff( + async () => { + const res = await sendBatch(transport, ndjsonBody, index) + if (res.errors > 0) { + // Only retry if all items failed (likely a transient cluster issue) + // Partial failures are reported as-is + if (res.errors === res.total) { + throw new Error(`Bulk batch failed: ${res.errors}/${res.total} errors`) + } + } + return res + }, + { retries, delay: retryDelay } + ) + + reporter.report(result.total, result.errors) + return result + }) + } catch (err) { + // If retries exhausted, report what we have so far + return transportError(err) + } + + return reporter.summary() + } +} + +export function createBulkIngestCommand (deps?: BulkIngestDeps): OpaqueCommandHandle { + return defineCommand({ + name: 'bulk-ingest', + description: 'Bulk-ingest documents from file, directory, or stdin with automatic batching, concurrency, and retries.', + options: [ + { long: 'index', short: 'i', description: 'Target index', type: 'string', required: true }, + { long: 'input-file', description: 'Path to data file (NDJSON or JSON array)', type: 'string' }, + { long: 'input-dir', description: 'Path to directory of data files to ingest', type: 'string' }, + { long: 'glob', description: 'Glob pattern for --input-dir file matching', type: 'string', defaultValue: '**/*.json' }, + { long: 'no-recursive', description: 'Do not recurse into subdirectories when using --input-dir', type: 'boolean' }, + { long: 'flush-bytes', description: 'Batch size threshold in bytes', type: 'number', defaultValue: 5242880 }, + { long: 'concurrency', description: 'Number of parallel bulk requests', type: 'number', defaultValue: 5 }, + { long: 'retries', description: 'Max retries per failed batch', type: 'number', defaultValue: 3 }, + { long: 'retry-delay', description: 'Initial retry delay in ms (doubles each attempt)', type: 'number', defaultValue: 1000 }, + { long: 'pipeline', description: 'Ingest pipeline name', type: 'string' }, + { long: 'routing', description: 'Custom routing value', type: 'string' }, + ], + handler: createBulkIngestHandler(deps), + formatOutput: (result) => { + const r = result as Record + if (r.error != null) return JSON.stringify(result, null, 2) + '\n' + const lines = [ + `Total: ${r.total}`, + `Succeeded: ${r.succeeded}`, + `Failed: ${r.failed}`, + `Retries: ${r.retries}`, + `Elapsed: ${r.elapsed_ms}ms`, + ] + if (r.files_processed != null) { + lines.push(`Files: ${r.files_processed}`) + } + return lines.join('\n') + '\n' + } + }) +} diff --git a/src/es/helpers/register.ts b/src/es/helpers/register.ts index 097c31a2..6326d3ff 100644 --- a/src/es/helpers/register.ts +++ b/src/es/helpers/register.ts @@ -5,6 +5,7 @@ import { defineGroup } from '../../factory.ts' import type { OpaqueCommandHandle } from '../../factory.ts' +import { createBulkIngestCommand } from './bulk-ingest.ts' /** * Registers all high-level helper commands under a `helpers` group. @@ -14,10 +15,8 @@ import type { OpaqueCommandHandle } from '../../factory.ts' * @returns an `OpaqueCommandHandle` for the `helpers` group */ export function registerHelperCommands (): OpaqueCommandHandle { - const commands: OpaqueCommandHandle[] = [] - return defineGroup( { name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' }, - ...commands + createBulkIngestCommand() ) } diff --git a/src/es/helpers/shared.ts b/src/es/helpers/shared.ts index 3a7504d5..4736ca7a 100644 --- a/src/es/helpers/shared.ts +++ b/src/es/helpers/shared.ts @@ -69,7 +69,7 @@ export function globFiles (dir: string, pattern: string): string[] { */ export function buildBulkNdjsonBody ( docs: unknown[], - opts: { index?: string, pipeline?: string, routing?: string } + opts: { index?: string | undefined, pipeline?: string | undefined, routing?: string | undefined } ): string { const lines: string[] = [] for (const doc of docs) { diff --git a/test/es/helpers/bulk-ingest.test.ts b/test/es/helpers/bulk-ingest.test.ts new file mode 100644 index 00000000..98657fc8 --- /dev/null +++ b/test/es/helpers/bulk-ingest.test.ts @@ -0,0 +1,263 @@ +/* + * Copyright Elasticsearch B.V. and contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, it, afterEach } from 'node:test' +import assert from 'node:assert/strict' +import { mkdtempSync, writeFileSync, mkdirSync } from 'node:fs' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import type { Transport, TransportRequestParams, TransportRequestOptions } from '@elastic/transport' +import { createBulkIngestCommand } from '../../../src/es/helpers/bulk-ingest.ts' +import type { BulkIngestDeps } from '../../../src/es/helpers/bulk-ingest.ts' +import { _testSetStdinReader } from '../../../src/factory.ts' +import { Command } from 'commander' + +/** Creates a mock transport that records requests and returns configurable responses. */ +function mockTransport (responses: Array<{ errors: boolean, items: Array> }>): { + transport: Transport + requests: Array<{ params: TransportRequestParams, opts?: TransportRequestOptions }> +} { + const requests: Array<{ params: TransportRequestParams, opts?: TransportRequestOptions }> = [] + let callIndex = 0 + const transport = { + request: async (params: TransportRequestParams, opts?: TransportRequestOptions) => { + requests.push({ params, opts }) + const response = responses[callIndex] ?? responses[responses.length - 1] + callIndex++ + return response + } + } as unknown as Transport + return { transport, requests } +} + +function makeDeps (transport: Transport): BulkIngestDeps { + return { getTransport: () => transport } +} + +function successResponse (count: number): { errors: boolean, items: Array> } { + return { + errors: false, + items: Array.from({ length: count }, () => ({ index: { status: 201 } })) + } +} + +/** Runs the bulk-ingest command programmatically and returns handler result. */ +async function runCommand (args: string[], deps: BulkIngestDeps): Promise { + const cmd = createBulkIngestCommand(deps) + const program = new Command() + program.exitOverride() + program.option('--json', 'output as JSON') + program.addCommand(cmd) + + // Capture stdout and stderr + const origStdoutWrite = process.stdout.write.bind(process.stdout) + const origStderrWrite = process.stderr.write.bind(process.stderr) + const stdoutChunks: string[] = [] + const stderrChunks: string[] = [] + process.stdout.write = ((chunk: string) => { + stdoutChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stdout.write + process.stderr.write = ((chunk: string) => { + stderrChunks.push(typeof chunk === 'string' ? chunk : chunk.toString()) + return true + }) as typeof process.stderr.write + + try { + await program.parseAsync(['node', 'test', 'bulk-ingest', ...args]) + } finally { + process.stdout.write = origStdoutWrite + process.stderr.write = origStderrWrite + process.exitCode = 0 + } + + // Prefer stderr (error results) over stdout; parse whichever has content + const errOutput = stderrChunks.join('') + const stdOutput = stdoutChunks.join('') + const output = errOutput.trim().length > 0 ? errOutput : stdOutput + if (output.trim().length > 0) { + try { + return JSON.parse(output.trim()) + } catch { + return output.trim() + } + } + return undefined +} + +describe('bulk-ingest command', () => { + let restoreStdin: (() => void) | undefined + + afterEach(() => { + if (restoreStdin != null) { + restoreStdin() + restoreStdin = undefined + } + }) + + it('creates a command named bulk-ingest', () => { + const { transport } = mockTransport([successResponse(1)]) + const cmd = createBulkIngestCommand(makeDeps(transport)) + assert.equal(cmd.name(), 'bulk-ingest') + }) + + it('ingests documents from --input-file with JSON array', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const filePath = join(tmpDir, 'data.json') + writeFileSync(filePath, JSON.stringify([{ title: 'doc1' }, { title: 'doc2' }])) + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand(['--index', 'test-idx', '--input-file', filePath, '--json'], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"index"')) + assert.ok(body.includes('"doc1"')) + assert.ok(body.includes('"doc2"')) + }) + + it('ingests documents from --input-file with NDJSON', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const filePath = join(tmpDir, 'data.ndjson') + writeFileSync(filePath, '{"title":"doc1"}\n{"title":"doc2"}\n') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand(['--index', 'test-idx', '--input-file', filePath, '--json'], makeDeps(transport)) + + assert.equal(requests.length, 1) + }) + + it('ingests documents from --input-dir', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'a.json'), '[{"a":1}]') + writeFileSync(join(tmpDir, 'b.json'), '[{"b":2}]') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--index', 'test-idx', + '--input-dir', tmpDir, + '--glob', '*.json', + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"a"')) + assert.ok(body.includes('"b"')) + }) + + it('recurses into subdirectories by default', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + mkdirSync(join(tmpDir, 'sub')) + writeFileSync(join(tmpDir, 'top.json'), '[{"top":1}]') + writeFileSync(join(tmpDir, 'sub', 'nested.json'), '[{"nested":2}]') + + const { transport, requests } = mockTransport([successResponse(2)]) + + await runCommand([ + '--index', 'test-idx', + '--input-dir', tmpDir, + '--json' + ], makeDeps(transport)) + + assert.equal(requests.length, 1) + const body = requests[0]!.params.body as string + assert.ok(body.includes('"top"')) + assert.ok(body.includes('"nested"')) + }) + + it('splits large inputs into multiple batches', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + const docs = Array.from({ length: 100 }, (_, i) => ({ id: i, data: 'x'.repeat(100) })) + writeFileSync(join(tmpDir, 'data.json'), JSON.stringify(docs)) + + // Use a small flush-bytes to force multiple batches + const { transport, requests } = mockTransport( + Array.from({ length: 100 }, () => successResponse(1)) + ) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--flush-bytes', '500', + '--json' + ], makeDeps(transport)) + + assert.ok(requests.length > 1, `Expected multiple batches, got ${requests.length}`) + }) + + it('includes pipeline and routing in bulk actions', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--pipeline', 'my-pipe', + '--routing', 'shard-1', + '--json' + ], makeDeps(transport)) + + const body = requests[0]!.params.body as string + const actionLine = JSON.parse(body.split('\n')[0]!) + assert.equal(actionLine.index.pipeline, 'my-pipe') + assert.equal(actionLine.index.routing, 'shard-1') + }) + + it('returns missing_config error when transport is not configured', async () => { + const deps: BulkIngestDeps = { + getTransport: () => { throw new Error('missing_config: no ES configured') } + } + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const result = await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], deps) as Record + + const error = result.error as Record + assert.equal(error.code, 'missing_config') + }) + + it('sends content-type application/x-ndjson header', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[{"a":1}]') + + const { transport, requests } = mockTransport([successResponse(1)]) + + await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], makeDeps(transport)) + + const opts = requests[0]!.opts as Record + const headers = opts.headers as Record + assert.equal(headers['content-type'], 'application/x-ndjson') + }) + + it('returns empty summary for zero documents', async () => { + const tmpDir = mkdtempSync(join(tmpdir(), 'bulk-test-')) + writeFileSync(join(tmpDir, 'data.json'), '[]') + + const { transport } = mockTransport([successResponse(0)]) + + const result = await runCommand([ + '--index', 'test-idx', + '--input-file', join(tmpDir, 'data.json'), + '--json' + ], makeDeps(transport)) as Record + + assert.equal(result.total, 0) + assert.equal(result.succeeded, 0) + }) +})