From 8e5d9333a470f9df52fc29e33eb14d52328ac2db Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 1 Apr 2020 18:11:47 +0200 Subject: [PATCH 1/5] Added async generator support in bulk helper --- lib/Helpers.d.ts | 2 +- lib/Helpers.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index 5e0745326..be11e3d32 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -65,7 +65,7 @@ type Action = IndexAction | CreateAction | UpdateAction | DeleteAction type Omit = Pick> export interface BulkHelperOptions extends Omit { - datasource: any[] | Buffer | ReadableStream + datasource: any[] | Buffer | ReadableStream | AsyncIterator onDocument: (doc: Record) => Action flushBytes?: number concurrency?: number diff --git a/lib/Helpers.js b/lib/Helpers.js index 8b16458cc..a74cd8ef0 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -158,8 +158,8 @@ class Helpers { if (datasource === undefined) { return Promise.reject(new ConfigurationError('bulk helper: the datasource is required')) } - if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function')) { - return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream')) + if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) { + return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')) } if (onDocument === undefined) { return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required')) From b19ddbf59062d5ed3eb02823fd225fd607ddbb89 Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 1 Apr 2020 18:11:54 +0200 Subject: [PATCH 2/5] Updated test --- test/unit/helpers/bulk.test.js | 55 +++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index bff3dd21f..b19ca12b5 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -653,6 +653,59 @@ test('bulk index', t => { t.end() }) + t.test('datasource as async generator', t => { + t.test('Should perform a bulk request', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'Content-Type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + async function * generator () { + const data = dataset.slice() + for (const doc of data) { + yield doc + } + } + + const result = await client.helpers.bulk({ + datasource: generator(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.end() + }) + t.end() }) @@ -896,7 +949,7 @@ test('errors', t => { }) } catch (err) { t.true(err instanceof errors.ConfigurationError) - t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream') + t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator') } }) From fabc4315127b70781b6a6c0b7235d94399dcb624 Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 1 Apr 2020 18:11:58 +0200 Subject: [PATCH 3/5] Updated docs --- docs/helpers.asciidoc | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index 374604837..5933e87f3 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -43,7 +43,7 @@ To create a new instance of the Bulk helper, you should access it as shown in th [cols=2*] |=== |`datasource` -a|An array or a readable stream with the data you need to index/create/update/delete. +a|An array, async generator or a readable stream with the data you need to index/create/update/delete. It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. + If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. + This parameter is mandatory. @@ -182,6 +182,36 @@ const result = await client.helpers.bulk({ }) ---- +==== Usage with an async generator + +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') + +async function * generator () { + const dataset = [ + { user: 'jon', age: 23 }, + { user: 'arya', age: 18 }, + { user: 'tyrion', age: 39 } + ] + for (const doc of dataset) { + yield doc + } +} + +const client = new Client({ node: 'http://localhost:9200' }) +const result = await client.helpers.bulk({ + datasource: generator(), + onDocument (doc) { + return { + index: { _index: 'my-index' } + } + } +}) + +console.log(result) +---- + === Search Helper A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents result. From a533245d423512ca152a3a93701b2b8030d75a14 Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 1 Apr 2020 18:19:32 +0200 Subject: [PATCH 4/5] Improved type definitions --- lib/Helpers.d.ts | 10 +++++----- test/types/helpers.test-d.ts | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index be11e3d32..256d2879d 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -10,7 +10,7 @@ export default class Helpers { search(params: Search, options?: TransportRequestOptions): Promise scrollSearch(params: Search, options?: TransportRequestOptions): AsyncIterable> scrollDocuments(params: Search, options?: TransportRequestOptions): AsyncIterable - bulk(options: BulkHelperOptions): BulkHelper + bulk(options: BulkHelperOptions): BulkHelper } export interface ScrollSearchResponse extends ApiResponse { @@ -64,13 +64,13 @@ type UpdateAction = [UpdateActionOperation, Record] type Action = IndexAction | CreateAction | UpdateAction | DeleteAction type Omit = Pick> -export interface BulkHelperOptions extends Omit { - datasource: any[] | Buffer | ReadableStream | AsyncIterator - onDocument: (doc: Record) => Action +export interface BulkHelperOptions extends Omit { + datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator + onDocument: (doc: TDocument) => Action flushBytes?: number concurrency?: number retries?: number wait?: number, - onDrop?: (doc: Record) => void, + onDrop?: (doc: TDocument) => void, refreshOnCompletion?: boolean | string } \ No newline at end of file diff --git a/test/types/helpers.test-d.ts b/test/types/helpers.test-d.ts index e0e0cbe52..ab796c5e0 100644 --- a/test/types/helpers.test-d.ts +++ b/test/types/helpers.test-d.ts @@ -18,7 +18,7 @@ const client = new Client({ /// .helpers.bulk -const b = client.helpers.bulk({ +const b = client.helpers.bulk>({ datasource: [], onDocument (doc) { expectType>(doc) @@ -59,7 +59,7 @@ expectError( return { index: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } // create { @@ -69,20 +69,20 @@ expectError( return { create: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } // update { // without `:BulkHelperOptions` this test cannot pass // but if we write these options inline inside // a `.helper.bulk`, it works as expected - const options: BulkHelperOptions = { + const options: BulkHelperOptions> = { datasource: [], onDocument (doc: Record) { return [{ update: { _index: 'test' } }, doc] } } - expectAssignable(options) + expectAssignable>>(options) } // delete { @@ -92,7 +92,7 @@ expectError( return { delete: { _index: 'test' } } } } - expectAssignable(options) + expectAssignable>>(options) } /// .helpers.scrollSearch From 6a0565caa273654ac078cb3bda43f3a39990c7da Mon Sep 17 00:00:00 2001 From: delvedor Date: Thu, 2 Apr 2020 11:45:16 +0200 Subject: [PATCH 5/5] Updated onDrop callback type definition --- lib/Helpers.d.ts | 16 +++++++++++++++- test/types/helpers.test-d.ts | 5 +++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index 256d2879d..2f3782099 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -71,6 +71,20 @@ export interface BulkHelperOptions extends Omit void, + onDrop?: (doc: OnDropDocument) => void, refreshOnCompletion?: boolean | string +} + +export interface OnDropDocument { + status: number + error: { + type: string, + reason: string, + caused_by: { + type: string, + reason: string + } + } + document: TDocument + retried: boolean } \ No newline at end of file diff --git a/test/types/helpers.test-d.ts b/test/types/helpers.test-d.ts index ab796c5e0..97ca59d78 100644 --- a/test/types/helpers.test-d.ts +++ b/test/types/helpers.test-d.ts @@ -9,7 +9,8 @@ import { BulkHelper, BulkStats, BulkHelperOptions, - ScrollSearchResponse + ScrollSearchResponse, + OnDropDocument } from '../../lib/Helpers' const client = new Client({ @@ -29,7 +30,7 @@ const b = client.helpers.bulk>({ retries: 3, wait: 5000, onDrop (doc) { - expectType>(doc) + expectType>>(doc) }, refreshOnCompletion: true, pipeline: 'my-pipeline'