Skip to content

Commit a16d49e

Browse files
committed
Added async generator support in bulk helper (#1138)
* Added async generator support in bulk helper * Updated test * Updated docs * Improved type definitions * Updated onDrop callback type definition
1 parent d15bd54 commit a16d49e

File tree

5 files changed

+115
-17
lines changed

5 files changed

+115
-17
lines changed

docs/helpers.asciidoc

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ To create a new instance of the Bulk helper, you should access it as shown in th
4343
[cols=2*]
4444
|===
4545
|`datasource`
46-
a|An array or a readable stream with the data you need to index/create/update/delete.
46+
a|An array, async generator or a readable stream with the data you need to index/create/update/delete.
4747
It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. +
4848
If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. +
4949
This parameter is mandatory.
@@ -182,6 +182,36 @@ const result = await client.helpers.bulk({
182182
})
183183
----
184184

185+
==== Usage with an async generator
186+
187+
[source,js]
188+
----
189+
const { Client } = require('@elastic/elasticsearch')
190+
191+
async function * generator () {
192+
const dataset = [
193+
{ user: 'jon', age: 23 },
194+
{ user: 'arya', age: 18 },
195+
{ user: 'tyrion', age: 39 }
196+
]
197+
for (const doc of dataset) {
198+
yield doc
199+
}
200+
}
201+
202+
const client = new Client({ node: 'http://localhost:9200' })
203+
const result = await client.helpers.bulk({
204+
datasource: generator(),
205+
onDocument (doc) {
206+
return {
207+
index: { _index: 'my-index' }
208+
}
209+
}
210+
})
211+
212+
console.log(result)
213+
----
214+
185215
=== Search Helper
186216
A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents result.
187217

lib/Helpers.d.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export default class Helpers {
1010
search<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
1111
scrollSearch<TRequestBody extends RequestBody, TDocument = unknown, TResponse = ResponseBody, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
1212
scrollDocuments<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
13-
bulk(options: BulkHelperOptions): BulkHelper<BulkStats>
13+
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
1414
}
1515

1616
export interface ScrollSearchResponse<TDocument = unknown, TResponse = ResponseBody, TContext = unknown> extends ApiResponse<TResponse, TContext> {
@@ -64,13 +64,27 @@ type UpdateAction = [UpdateActionOperation, Record<string, any>]
6464
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
6565
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>
6666

67-
export interface BulkHelperOptions extends Omit<Bulk, 'body'> {
68-
datasource: any[] | Buffer | ReadableStream
69-
onDocument: (doc: Record<string, any>) => Action
67+
export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body'> {
68+
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
69+
onDocument: (doc: TDocument) => Action
7070
flushBytes?: number
7171
concurrency?: number
7272
retries?: number
7373
wait?: number,
74-
onDrop?: (doc: Record<string, any>) => void,
74+
onDrop?: (doc: OnDropDocument<TDocument>) => void,
7575
refreshOnCompletion?: boolean | string
76+
}
77+
78+
export interface OnDropDocument<TDocument = unknown> {
79+
status: number
80+
error: {
81+
type: string,
82+
reason: string,
83+
caused_by: {
84+
type: string,
85+
reason: string
86+
}
87+
}
88+
document: TDocument
89+
retried: boolean
7690
}

lib/Helpers.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ class Helpers {
158158
if (datasource === undefined) {
159159
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'))
160160
}
161-
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function')) {
162-
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream'))
161+
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) {
162+
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'))
163163
}
164164
if (onDocument === undefined) {
165165
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'))

test/types/helpers.test-d.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import {
99
BulkHelper,
1010
BulkStats,
1111
BulkHelperOptions,
12-
ScrollSearchResponse
12+
ScrollSearchResponse,
13+
OnDropDocument
1314
} from '../../lib/Helpers'
1415

1516
const client = new Client({
@@ -18,7 +19,7 @@ const client = new Client({
1819

1920
/// .helpers.bulk
2021

21-
const b = client.helpers.bulk({
22+
const b = client.helpers.bulk<Record<string, any>>({
2223
datasource: [],
2324
onDocument (doc) {
2425
expectType<Record<string, any>>(doc)
@@ -29,7 +30,7 @@ const b = client.helpers.bulk({
2930
retries: 3,
3031
wait: 5000,
3132
onDrop (doc) {
32-
expectType<Record<string, any>>(doc)
33+
expectType<OnDropDocument<Record<string, any>>>(doc)
3334
},
3435
refreshOnCompletion: true,
3536
pipeline: 'my-pipeline'
@@ -59,7 +60,7 @@ expectError(
5960
return { index: { _index: 'test' } }
6061
}
6162
}
62-
expectAssignable<BulkHelperOptions>(options)
63+
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
6364
}
6465
// create
6566
{
@@ -69,20 +70,20 @@ expectError(
6970
return { create: { _index: 'test' } }
7071
}
7172
}
72-
expectAssignable<BulkHelperOptions>(options)
73+
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
7374
}
7475
// update
7576
{
7677
// without `:BulkHelperOptions` this test cannot pass
7778
// but if we write these options inline inside
7879
// a `.helper.bulk`, it works as expected
79-
const options: BulkHelperOptions = {
80+
const options: BulkHelperOptions<Record<string, any>> = {
8081
datasource: [],
8182
onDocument (doc: Record<string, any>) {
8283
return [{ update: { _index: 'test' } }, doc]
8384
}
8485
}
85-
expectAssignable<BulkHelperOptions>(options)
86+
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
8687
}
8788
// delete
8889
{
@@ -92,7 +93,7 @@ expectError(
9293
return { delete: { _index: 'test' } }
9394
}
9495
}
95-
expectAssignable<BulkHelperOptions>(options)
96+
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
9697
}
9798

9899
/// .helpers.scrollSearch

test/unit/helpers/bulk.test.js

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,59 @@ test('bulk index', t => {
653653
t.end()
654654
})
655655

656+
t.test('datasource as async generator', t => {
657+
t.test('Should perform a bulk request', async t => {
658+
let count = 0
659+
const MockConnection = connection.buildMockConnection({
660+
onRequest (params) {
661+
t.strictEqual(params.path, '/_bulk')
662+
t.match(params.headers, { 'Content-Type': 'application/x-ndjson' })
663+
const [action, payload] = params.body.split('\n')
664+
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
665+
t.deepEqual(JSON.parse(payload), dataset[count++])
666+
return { body: { errors: false, items: [{}] } }
667+
}
668+
})
669+
670+
const client = new Client({
671+
node: 'http://localhost:9200',
672+
Connection: MockConnection
673+
})
674+
675+
async function * generator () {
676+
const data = dataset.slice()
677+
for (const doc of data) {
678+
yield doc
679+
}
680+
}
681+
682+
const result = await client.helpers.bulk({
683+
datasource: generator(),
684+
flushBytes: 1,
685+
concurrency: 1,
686+
onDocument (doc) {
687+
return {
688+
index: { _index: 'test' }
689+
}
690+
},
691+
onDrop (doc) {
692+
t.fail('This should never be called')
693+
}
694+
})
695+
696+
t.type(result.time, 'number')
697+
t.type(result.bytes, 'number')
698+
t.match(result, {
699+
total: 3,
700+
successful: 3,
701+
retry: 0,
702+
failed: 0,
703+
aborted: false
704+
})
705+
})
706+
t.end()
707+
})
708+
656709
t.end()
657710
})
658711

@@ -896,7 +949,7 @@ test('errors', t => {
896949
})
897950
} catch (err) {
898951
t.true(err instanceof errors.ConfigurationError)
899-
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream')
952+
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')
900953
}
901954
})
902955

0 commit comments

Comments
 (0)