Skip to content

Commit

Permalink
feat: bulk helper improvements (#51)
Browse files Browse the repository at this point in the history
Applies patches from elastic/elasticsearch-js#2199 and elastic/elasticsearch-js#2027,
adding support for an onSuccess callback and fixing a bug that would cause the helper to
hang when the flushInterval was lower than the request timeout.

---------

Co-authored-by: JoshMock <160161+JoshMock@users.noreply.github.com>
  • Loading branch information
JoshMock and JoshMock committed Apr 3, 2024
1 parent 3360c03 commit eb7d29c
Show file tree
Hide file tree
Showing 2 changed files with 656 additions and 56 deletions.
134 changes: 97 additions & 37 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ export interface BulkStats {
aborted: boolean
}

interface IndexAction {
interface IndexActionOperation {
index: T.BulkIndexOperation
}

interface CreateAction {
interface CreateActionOperation {
create: T.BulkCreateOperation
}

Expand All @@ -90,7 +90,9 @@ interface DeleteAction {
delete: T.BulkDeleteOperation
}

type UpdateAction = [UpdateActionOperation, Record<string, any>]
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction

export interface OnDropDocument<TDocument = unknown> {
Expand All @@ -101,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
retried: boolean
}

type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>

export interface OnSuccessDocument<TDocument = unknown> {
result: BulkResponseItem
document?: TDocument
}

interface ZippedResult<TDocument = unknown> {
result: BulkResponseItem
raw: {
action: string
document?: string
}
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document?: () => TDocument
}

export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
Expand All @@ -110,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
onSuccess?: (doc: OnSuccessDocument) => void
}

export interface BulkHelper<T> extends Promise<BulkStats> {
Expand Down Expand Up @@ -379,7 +400,7 @@ export default class Helpers {
clearTimeout(timeoutRef)
}

// In some cases the previos http call does not have finished,
// In some cases the previous http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
const send = await semaphore()
Expand Down Expand Up @@ -415,8 +436,8 @@ export default class Helpers {
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// if we didn't reach the maximum concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running requests has finished.
// The semaphore function resolves a send function, which will be used
// to send the actual msearch request.
// It also returns a finish function, which returns a promise that is resolved
Expand Down Expand Up @@ -548,6 +569,9 @@ export default class Helpers {
retries = this[kMaxRetries],
wait = 5000,
onDrop = noop,
// onSuccess does not default to noop, to avoid the performance hit
// of deserializing every document in the bulk request
onSuccess,
...bulkOptions
} = options

Expand Down Expand Up @@ -620,26 +644,25 @@ export default class Helpers {
let chunkBytes = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line

// @ts-expect-error datasoruce is an iterable
// @ts-expect-error datasource is an iterable
for await (const chunk of datasource) {
if (shouldAbort) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
const result = onDocument(chunk)
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
const operation = Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
payloadBody = typeof payload === 'string'
? payload
: serializer.serialize(payload)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
// @ts-expect-error in case of update action is an array
actionBody = serializer.serialize(action[0])
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
// @ts-expect-error in case of update action is an array
: serializer.serialize({ doc: chunk, ...action[1] })
: serializer.serialize({ doc: chunk, ...payload })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
Expand All @@ -653,15 +676,16 @@ export default class Helpers {

if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
const send = await semaphore()
send(bulkBodyCopy)
}
}

clearTimeout(timeoutRef)
// In some cases the previos http call does not have finished,
// In some cases the previous http call has not finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (!shouldAbort && chunkBytes > 0) {
const send = await semaphore()
Expand Down Expand Up @@ -697,8 +721,8 @@ export default class Helpers {
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// if we didn't reach the maximum concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running requests has finished.
// The semaphore function resolves a send function, which will be used
// to send the actual bulk request.
// It also returns a finish function, which returns a promise that is resolved
Expand Down Expand Up @@ -805,57 +829,93 @@ export default class Helpers {
callback()
}

/**
* Zips bulk response items (the action's result) with the original document body.
* The raw string version of action and document lines are also included.
*/
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
const zipped = []
let indexSlice = 0
for (let i = 0, len = responseItems.length; i < len; i++) {
const result = responseItems[i]
const operation = Object.keys(result)[0]
let zipResult

if (operation === 'delete') {
zipResult = {
result,
raw: { action: bulkBody[indexSlice] }
}
indexSlice += 1
} else {
const document = bulkBody[indexSlice + 1]
zipResult = {
result,
raw: { action: bulkBody[indexSlice], document },
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document: () => serializer.deserialize(document)
}
indexSlice += 2
}

zipped.push(zipResult as ZippedResult)
}

return zipped
}

function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
if (shouldAbort) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(response => {
const result = response.body
const results = zipBulkResults(result.items, bulkBody)

if (!result.errors) {
stats.successful += result.items.length
for (const item of result.items) {
if (item.update?.result === 'noop') {
for (const item of results) {
const { result, document = noop } = item
if (result.update?.result === 'noop') {
stats.noop++
}
if (onSuccess != null) onSuccess({ result, document: document() })
}
return callback(null, [])
}
const retry = []
const { items } = result
let indexSlice = 0
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
for (const item of results) {
const { result, raw, document = noop } = item
const operation = Object.keys(result)[0]
// @ts-expect-error
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')

if (responseItem.status >= 400) {
// 429 is the only status code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
// but the ES node was handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice])
retry.push(raw.action)
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
retry.push(raw.document ?? '')
}
} else {
onDrop({
status: responseItem.status,
error: responseItem.error ?? null,
operation: serializer.deserialize(bulkBody[indexSlice]),
operation: serializer.deserialize(raw.action),
// @ts-expect-error
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
document: document(),
retried: isRetrying
})
stats.failed += 1
}
} else {
stats.successful += 1
if (onSuccess != null) onSuccess({ result, document: document() })
}
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
}
callback(null, retry)
})
Expand Down

0 comments on commit eb7d29c

Please sign in to comment.