Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion docs/helpers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ To create a new instance of the Bulk helper, you should access it as shown in th
[cols=2*]
|===
|`datasource`
a|An array or a readable stream with the data you need to index/create/update/delete.
a|An array, async generator or a readable stream with the data you need to index/create/update/delete.
It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. +
If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. +
This parameter is mandatory.
Expand Down Expand Up @@ -182,6 +182,36 @@ const result = await client.helpers.bulk({
})
----

==== Usage with an async generator

[source,js]
----
const { Client } = require('@elastic/elasticsearch')

async function * generator () {
const dataset = [
{ user: 'jon', age: 23 },
{ user: 'arya', age: 18 },
{ user: 'tyrion', age: 39 }
]
for (const doc of dataset) {
yield doc
}
}

const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: generator(),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})

console.log(result)
----

=== Search Helper
A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents result.

Expand Down
24 changes: 19 additions & 5 deletions lib/Helpers.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default class Helpers {
search<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
scrollSearch<TRequestBody extends RequestBody, TDocument = unknown, TResponse = ResponseBody, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
scrollDocuments<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
bulk(options: BulkHelperOptions): BulkHelper<BulkStats>
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
}

export interface ScrollSearchResponse<TDocument = unknown, TResponse = ResponseBody, TContext = unknown> extends ApiResponse<TResponse, TContext> {
Expand Down Expand Up @@ -64,13 +64,27 @@ type UpdateAction = [UpdateActionOperation, Record<string, any>]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>

export interface BulkHelperOptions extends Omit<Bulk, 'body'> {
datasource: any[] | Buffer | ReadableStream
onDocument: (doc: Record<string, any>) => Action
export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body'> {
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
flushBytes?: number
concurrency?: number
retries?: number
wait?: number,
onDrop?: (doc: Record<string, any>) => void,
onDrop?: (doc: OnDropDocument<TDocument>) => void,
refreshOnCompletion?: boolean | string
}

export interface OnDropDocument<TDocument = unknown> {
status: number
error: {
type: string,
reason: string,
caused_by: {
type: string,
reason: string
}
}
document: TDocument
retried: boolean
}
4 changes: 2 additions & 2 deletions lib/Helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class Helpers {
if (datasource === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'))
}
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function')) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream'))
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'))
}
if (onDocument === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'))
Expand Down
17 changes: 9 additions & 8 deletions test/types/helpers.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
BulkHelper,
BulkStats,
BulkHelperOptions,
ScrollSearchResponse
ScrollSearchResponse,
OnDropDocument
} from '../../lib/Helpers'

const client = new Client({
Expand All @@ -18,7 +19,7 @@ const client = new Client({

/// .helpers.bulk

const b = client.helpers.bulk({
const b = client.helpers.bulk<Record<string, any>>({
datasource: [],
onDocument (doc) {
expectType<Record<string, any>>(doc)
Expand All @@ -29,7 +30,7 @@ const b = client.helpers.bulk({
retries: 3,
wait: 5000,
onDrop (doc) {
expectType<Record<string, any>>(doc)
expectType<OnDropDocument<Record<string, any>>>(doc)
},
refreshOnCompletion: true,
pipeline: 'my-pipeline'
Expand Down Expand Up @@ -59,7 +60,7 @@ expectError(
return { index: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// create
{
Expand All @@ -69,20 +70,20 @@ expectError(
return { create: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// update
{
// without `:BulkHelperOptions` this test cannot pass
// but if we write these options inline inside
// a `.helper.bulk`, it works as expected
const options: BulkHelperOptions = {
const options: BulkHelperOptions<Record<string, any>> = {
datasource: [],
onDocument (doc: Record<string, any>) {
return [{ update: { _index: 'test' } }, doc]
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// delete
{
Expand All @@ -92,7 +93,7 @@ expectError(
return { delete: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}

/// .helpers.scrollSearch
Expand Down
55 changes: 54 additions & 1 deletion test/unit/helpers/bulk.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,59 @@ test('bulk index', t => {
t.end()
})

t.test('datasource as async generator', t => {
t.test('Should perform a bulk request', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.strictEqual(params.path, '/_bulk')
t.match(params.headers, { 'Content-Type': 'application/x-ndjson' })
const [action, payload] = params.body.split('\n')
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
t.deepEqual(JSON.parse(payload), dataset[count++])
return { body: { errors: false, items: [{}] } }
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

async function * generator () {
const data = dataset.slice()
for (const doc of data) {
yield doc
}
}

const result = await client.helpers.bulk({
datasource: generator(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
t.fail('This should never be called')
}
})

t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})
t.end()
})

t.end()
})

Expand Down Expand Up @@ -896,7 +949,7 @@ test('errors', t => {
})
} catch (err) {
t.true(err instanceof errors.ConfigurationError)
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream')
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')
}
})

Expand Down