diff --git a/lib/Helpers.js b/lib/Helpers.js index 8c5a7054e..8b16458cc 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -229,10 +229,12 @@ class Helpers { chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody) bulkBody.push(payloadBody) - } else { // delete + } else if (operation === 'delete') { actionBody = serialize(action) chunkBytes += Buffer.byteLength(actionBody) bulkBody.push(actionBody) + } else { + throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`) } if (chunkBytes >= flushBytes) { @@ -315,6 +317,7 @@ class Helpers { } function send (bulkBody) { + /* istanbul ignore if */ if (running >= concurrency) { throw new Error('Max concurrency reached') } @@ -351,11 +354,12 @@ class Helpers { function retryDocuments (err, bulkBody) { if (err) return callback(err) if (shouldAbort === true) return callback() - isRetrying = true if (bulkBody.length > 0) { if (retryCount > 0) { + isRetrying = true retryCount -= 1 + stats.retry += bulkBody.length setTimeout(tryBulk, wait, bulkBody, retryDocuments) return } @@ -397,7 +401,6 @@ class Helpers { // a document, because it was not an error in the document itself, // but the ES node were handling too many operations. if (status === 429) { - stats.retry += 1 retry.push(bulkBody[indexSlice]) if (operation !== 'delete') { retry.push(bulkBody[indexSlice + 1]) diff --git a/lib/Transport.js b/lib/Transport.js index 7e1fe66fc..b20d3f0cb 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -49,6 +49,7 @@ class Transport { } else if (opts.nodeSelector === 'round-robin') { this.nodeSelector = roundRobinSelector() } else if (opts.nodeSelector === 'random') { + /* istanbul ignore next */ this.nodeSelector = randomSelector } else { this.nodeSelector = roundRobinSelector() diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index ee15eb103..bff3dd21f 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -143,6 +143,51 @@ test('bulk index', t => { }) }) + t.test('refreshOnCompletion', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (params.method === 'GET') { + t.strictEqual(params.path, '/_all/_refresh') + return { body: { acknowledged: true } } + } else { + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'Content-Type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + refreshOnCompletion: true, + onDocument (doc) { + return { + index: { _index: 'test' } + } + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.test('Should perform a bulk request (custom action)', async t => { let count = 0 const MockConnection = connection.buildMockConnection({ @@ -262,6 +307,55 @@ test('bulk index', t => { server.stop() }) + t.test('Should perform a bulk request (retry a single document from batch)', async t => { + function handler (req, res) { + res.setHeader('content-type', 'application/json') + res.end(JSON.stringify({ + took: 0, + errors: true, + items: [ + { index: { status: 200 } }, + { index: { status: 429 } }, + { index: { status: 200 } } + ] + })) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + concurrency: 1, + wait: 10, + retries: 0, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.deepEqual(doc, { + status: 429, + error: null, + operation: { index: { _index: 'test' } }, + document: { user: 'arya', age: 18 }, + retried: false + }) + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 2, + retry: 0, + failed: 1, + aborted: false + }) + server.stop() + }) + t.test('Should perform a bulk request (failure)', async t => { if (semver.lt(process.versions.node, '10.0.0')) { t.skip('This test will not pass on Node v8') @@ -475,6 +569,35 @@ test('bulk index', t => { server.stop() }) + t.test('Invalid operation', t => { + t.plan(2) + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + client.helpers + .bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return { + foo: { _index: 'test' } + } + } + }) + .catch(err => { + t.true(err instanceof errors.ConfigurationError) + t.is(err.message, `Bulk helper invalid action: 'foo'`) + }) + }) + t.end() }) diff --git a/test/unit/helpers/scroll.test.js b/test/unit/helpers/scroll.test.js index 660ff5b17..1ef1a6cb1 100644 --- a/test/unit/helpers/scroll.test.js +++ b/test/unit/helpers/scroll.test.js @@ -164,7 +164,8 @@ test('Scroll search (retry throws and maxRetries)', async t => { index: 'test', body: { foo: 'bar' } }, { - wait: 10 + wait: 10, + ignore: [404] }) try { @@ -178,6 +179,55 @@ test('Scroll search (retry throws and maxRetries)', async t => { } }) +test('Scroll search (retry throws later)', async t => { + var count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (count > 1) { + count += 1 + return { body: {}, statusCode: 429 } + } + return { + statusCode: 200, + body: { + _scroll_id: count === 4 ? undefined : 'id', + count, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const scrollSearch = client.helpers.scrollSearch({ + index: 'test', + body: { foo: 'bar' } + }, { + wait: 10 + }) + + try { + for await (const result of scrollSearch) { // eslint-disable-line + t.strictEqual(result.body.count, count) + count += 1 + } + } catch (err) { + t.true(err instanceof errors.ResponseError) + t.strictEqual(err.statusCode, 429) + t.strictEqual(count, 5) + } +}) + test('Scroll search documents', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ diff --git a/test/unit/helpers/search.test.js b/test/unit/helpers/search.test.js index 14473b56d..920f353dc 100644 --- a/test/unit/helpers/search.test.js +++ b/test/unit/helpers/search.test.js @@ -40,3 +40,22 @@ test('Search should have an additional documents property', async t => { { three: 'three' } ]) }) + +test('kGetHits fallback', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { body: {} } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.search({ + index: 'test', + body: { foo: 'bar' } + }) + t.deepEqual(result, []) +})