Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/es/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { errors } from '@elastic/transport'
import type { JsonValue } from '../factory.ts'

/** Builds a `missing_config` error payload from a thrown error. */
export function missingConfigError (err: unknown): JsonValue {
const message = err instanceof Error ? err.message : String(err)
return { error: { code: 'missing_config', message } }
}

/** Builds a `transport_error` payload from a thrown transport error. */
export function transportError (err: unknown): JsonValue {
if (err instanceof errors.ResponseError) {
return {
error: {
code: 'transport_error',
status_code: err.statusCode ?? null,
body: err.body as JsonValue ?? null
}
}
}

const message = err instanceof Error ? err.message : String(err)
return { error: { code: 'transport_error', message } }
}
23 changes: 1 addition & 22 deletions src/es/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { errors } from '@elastic/transport'
import type { Transport } from '@elastic/transport'
import type { EsApiDefinition } from './types.ts'
import type { SchemaArgDefinition } from '../lib/schema-args.ts'
import { buildRequestParams } from './request-builder.ts'
import { getTransport } from '../lib/transport.ts'
import { missingConfigError, transportError } from './errors.ts'
import type { JsonValue, ParsedResult } from '../factory.ts'

/**
Expand Down Expand Up @@ -76,24 +76,3 @@ export function createEsHandler (
}
}

/** builds a `missing_config` error payload from a thrown error */
function missingConfigError (err: unknown): JsonValue {
const message = err instanceof Error ? err.message : String(err)
return { error: { code: 'missing_config', message } }
}

/** builds a `transport_error` payload from a thrown transport error */
function transportError (err: unknown): JsonValue {
if (err instanceof errors.ResponseError) {
return {
error: {
code: 'transport_error',
status_code: err.statusCode ?? null,
body: err.body as JsonValue ?? null
}
}
}

const message = err instanceof Error ? err.message : String(err)
return { error: { code: 'transport_error', message } }
}
23 changes: 23 additions & 0 deletions src/es/helpers/register.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { defineGroup } from '../../factory.ts'
import type { OpaqueCommandHandle } from '../../factory.ts'

/**
* Registers all high-level helper commands under a `helpers` group.
* Helper commands provide convenience abstractions over common Elasticsearch
* workflows (bulk ingestion, scroll search, multi-search batching).
*
* @returns an `OpaqueCommandHandle` for the `helpers` group
*/
export function registerHelperCommands (): OpaqueCommandHandle {
const commands: OpaqueCommandHandle[] = []

return defineGroup(
{ name: 'helpers', description: 'High-level helper commands for common Elasticsearch workflows' },
...commands
)
}
166 changes: 166 additions & 0 deletions src/es/helpers/shared.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { readFileSync, globSync } from 'node:fs'
import { resolve } from 'node:path'
import type { JsonValue } from '../../factory.ts'

/**
* Parses raw text input as either a JSON array or NDJSON (newline-delimited JSON).
* Auto-detects the format: if the trimmed input starts with `[`, it's parsed as JSON array;
* otherwise each non-empty line is parsed as a separate JSON object.
*/
export function parseInput (raw: string): unknown[] {
const trimmed = raw.trim()
if (trimmed.length === 0) return []

if (trimmed.startsWith('[')) {
const parsed = JSON.parse(trimmed)
if (!Array.isArray(parsed)) {
throw new Error('Expected a JSON array, got: ' + typeof parsed)
}
return parsed
}

const lines = trimmed.split('\n')
const docs: unknown[] = []
for (let i = 0; i < lines.length; i++) {
const line = lines[i]!.trim()
if (line.length === 0) continue
try {
docs.push(JSON.parse(line))
} catch {
throw new Error(`Failed to parse NDJSON at line ${i + 1}: ${line.slice(0, 80)}`)
}
}
return docs
}

/**
* Reads raw text content from a file path or stdin.
* Returns `undefined` when no input is available (interactive TTY with no file).
*/
export function readRawInput (filePath?: string): string | undefined {
if (filePath != null) {
return readFileSync(filePath, 'utf-8')
}
if (!process.stdin.isTTY) {
const content = readFileSync(0, 'utf-8')
return content.length > 0 ? content : undefined
}
return undefined
}

/**
* Resolves a glob pattern against a directory and returns a sorted list of absolute file paths.
* Uses Node.js native `fs.globSync` (available since Node 22).
*/
export function globFiles (dir: string, pattern: string): string[] {
const absDir = resolve(dir)
const matches = globSync(pattern, { cwd: absDir })
return matches.map((f) => resolve(absDir, f)).sort()
}

/**
* Builds an NDJSON body for the Elasticsearch `_bulk` API.
* Each document is wrapped in an `{"index": {...}}` action line followed by the document line.
*/
export function buildBulkNdjsonBody (
docs: unknown[],
opts: { index?: string, pipeline?: string, routing?: string }
): string {
const lines: string[] = []
for (const doc of docs) {
const action: Record<string, unknown> = {}
if (opts.index != null) action._index = opts.index
if (opts.pipeline != null) action.pipeline = opts.pipeline
if (opts.routing != null) action.routing = opts.routing
lines.push(JSON.stringify({ index: action }))
lines.push(JSON.stringify(doc))
}
// bulk API requires a trailing newline
return lines.join('\n') + '\n'
}

/**
* Retries an async function with exponential backoff.
* On each failure the delay doubles. Rethrows the last error after exhausting retries.
*/
export async function retryWithBackoff<T> (
fn: () => Promise<T>,
opts: { retries: number, delay: number }
): Promise<T> {
let lastError: unknown
let currentDelay = opts.delay
for (let attempt = 0; attempt <= opts.retries; attempt++) {
try {
return await fn()
} catch (err) {
lastError = err
if (attempt < opts.retries) {
await new Promise((resolve) => setTimeout(resolve, currentDelay))
currentDelay *= 2
}
}
}
throw lastError
}

/**
* Runs async tasks with a concurrency limit.
* Returns results in the same order as the input items.
*/
export async function runWithConcurrency<T, R> (
items: T[],
concurrency: number,
fn: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const results: R[] = new Array(items.length)
let nextIndex = 0

async function worker (): Promise<void> {
while (nextIndex < items.length) {
const i = nextIndex++
results[i] = await fn(items[i]!, i)
}
}

const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker())
await Promise.all(workers)
return results
}

/** Tracks bulk ingestion progress and writes updates to stderr. */
export class ProgressReporter {
total = 0
succeeded = 0
failed = 0
retries = 0
filesProcessed = 0
private readonly startTime = Date.now()

report (batchSize: number, batchErrors: number, fileName?: string): void {
this.total += batchSize
this.succeeded += batchSize - batchErrors
this.failed += batchErrors
const prefix = fileName != null ? `[${fileName}] ` : ''
process.stderr.write(
`\r${prefix}Progress: ${this.succeeded} succeeded, ${this.failed} failed, ${this.total} total`
)
}

summary (): JsonValue {
const elapsed_ms = Date.now() - this.startTime
process.stderr.write('\n')
return {
total: this.total,
succeeded: this.succeeded,
failed: this.failed,
retries: this.retries,
elapsed_ms,
...(this.filesProcessed > 0 && { files_processed: this.filesProcessed })
}
}
}
5 changes: 4 additions & 1 deletion src/es/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { validateApiDefinition, resolveInput } from './types.ts'
import type { SchemaArgDefinition } from '../lib/schema-args.ts'
import { allApis } from './apis.ts'
import { createEsHandler } from './handler.ts'
import { registerHelperCommands } from './helpers/register.ts'

/** Builds a leaf command handle from a definition and its pre-computed schema args. */
function buildLeafHandle (
Expand Down Expand Up @@ -108,5 +109,7 @@ export function registerEsCommands (
rootHandles.push(buildLeafHandle(def, defSchemaArgs))
}

return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles)
const helpersGroup = registerHelperCommands()

return defineGroup({ name: 'es', description: 'Interact with the Elasticsearch API' }, ...namespaceHandles, ...rootHandles, helpersGroup)
}
22 changes: 22 additions & 0 deletions test/es/helpers/register.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { describe, it } from 'node:test'
import assert from 'node:assert/strict'
import { Command } from 'commander'
import { registerHelperCommands } from '../../../src/es/helpers/register.ts'

describe('registerHelperCommands', () => {
it('returns a command group named "helpers"', () => {
const group = registerHelperCommands()
assert.ok(group instanceof Command)
assert.equal(group.name(), 'helpers')
})

it('has a description', () => {
const group = registerHelperCommands()
assert.ok(group.description().length > 0)
})
})
Loading
Loading