diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index 374604837..5933e87f3 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -43,7 +43,7 @@ To create a new instance of the Bulk helper, you should access it as shown in th [cols=2*] |=== |`datasource` -a|An array or a readable stream with the data you need to index/create/update/delete. +a|An array, async generator or a readable stream with the data you need to index/create/update/delete. It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. + If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. + This parameter is mandatory. @@ -182,6 +182,36 @@ const result = await client.helpers.bulk({ }) ---- +==== Usage with an async generator + +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') + +async function * generator () { + const dataset = [ + { user: 'jon', age: 23 }, + { user: 'arya', age: 18 }, + { user: 'tyrion', age: 39 } + ] + for (const doc of dataset) { + yield doc + } +} + +const client = new Client({ node: 'http://localhost:9200' }) +const result = await client.helpers.bulk({ + datasource: generator(), + onDocument (doc) { + return { + index: { _index: 'my-index' } + } + } +}) + +console.log(result) +---- + === Search Helper A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents result. diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index 5e0745326..2f3782099 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -10,7 +10,7 @@ export default class Helpers { search(params: Search, options?: TransportRequestOptions): Promise scrollSearch(params: Search, options?: TransportRequestOptions): AsyncIterable> scrollDocuments(params: Search, options?: TransportRequestOptions): AsyncIterable - bulk(options: BulkHelperOptions): BulkHelper + bulk(options: BulkHelperOptions): BulkHelper } export interface ScrollSearchResponse extends ApiResponse { @@ -64,13 +64,27 @@ type UpdateAction = [UpdateActionOperation, Record] type Action = IndexAction | CreateAction | UpdateAction | DeleteAction type Omit = Pick> -export interface BulkHelperOptions extends Omit { - datasource: any[] | Buffer | ReadableStream - onDocument: (doc: Record) => Action +export interface BulkHelperOptions extends Omit { + datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator + onDocument: (doc: TDocument) => Action flushBytes?: number concurrency?: number retries?: number wait?: number, - onDrop?: (doc: Record) => void, + onDrop?: (doc: OnDropDocument) => void, refreshOnCompletion?: boolean | string +} + +export interface OnDropDocument { + status: number + error: { + type: string, + reason: string, + caused_by: { + type: string, + reason: string + } + } + document: TDocument + retried: boolean } \ No newline at end of file diff --git a/lib/Helpers.js b/lib/Helpers.js index 8b16458cc..a74cd8ef0 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -158,8 +158,8 @@ class Helpers { if (datasource === undefined) { return Promise.reject(new ConfigurationError('bulk helper: the datasource is required')) } - if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function')) { - return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream')) + if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) { + return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')) } if (onDocument === undefined) { return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required')) diff --git a/test/types/helpers.test-d.ts b/test/types/helpers.test-d.ts index e0e0cbe52..97ca59d78 100644 --- a/test/types/helpers.test-d.ts +++ b/test/types/helpers.test-d.ts @@ -9,7 +9,8 @@ import { BulkHelper, BulkStats, BulkHelperOptions, - ScrollSearchResponse + ScrollSearchResponse, + OnDropDocument } from '../../lib/Helpers' const client = new Client({ @@ -18,7 +19,7 @@ const client = new Client({ /// .helpers.bulk -const b = client.helpers.bulk({ +const b = client.helpers.bulk>({ datasource: [], onDocument (doc) { expectType>(doc) @@ -29,7 +30,7 @@ const b = client.helpers.bulk({ retries: 3, wait: 5000, onDrop (doc) { - expectType>(doc) + expectType>>(doc) }, refreshOnCompletion: true, pipeline: 'my-pipeline' @@ -59,7 +60,7 @@ expectError( return { index: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } // create { @@ -69,20 +70,20 @@ expectError( return { create: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } // update { // without `:BulkHelperOptions` this test cannot pass // but if we write these options inline inside // a `.helper.bulk`, it works as expected - const options: BulkHelperOptions = { + const options: BulkHelperOptions> = { datasource: [], onDocument (doc: Record) { return [{ update: { _index: 'test' } }, doc] } } - expectAssignable(options) + expectAssignable>>(options) } // delete { @@ -92,7 +93,7 @@ expectError( return { delete: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } /// .helpers.scrollSearch diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index bff3dd21f..b19ca12b5 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -653,6 +653,59 @@ test('bulk index', t => { t.end() }) + t.test('datasource as async generator', t => { + t.test('Should perform a bulk request', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'Content-Type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: generator(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() + }) + t.end() }) @@ -896,7 +949,7 @@ test('errors', t => { }) } catch (err) { t.true(err instanceof errors.ConfigurationError) - t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream') + t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator') } })