From e27f684404f3fe0a4ba2ae005663c04562591b87 Mon Sep 17 00:00:00 2001 From: delvedor Date: Tue, 24 Mar 2020 10:57:34 +0100 Subject: [PATCH 1/2] Helpers: Fixed stats counting --- lib/Helpers.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/Helpers.js b/lib/Helpers.js index 8c5a7054e..e72eec72a 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -229,10 +229,12 @@ class Helpers { chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) bulkBody.push(actionBody) bulkBody.push(payloadBody) - } else { // delete + } else if (operation === 'delete') { actionBody = serialize(action) chunkBytes += Buffer.byteLength(actionBody) bulkBody.push(actionBody) + } else { + throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`) } if (chunkBytes >= flushBytes) { @@ -351,11 +353,12 @@ class Helpers { function retryDocuments (err, bulkBody) { if (err) return callback(err) if (shouldAbort === true) return callback() - isRetrying = true if (bulkBody.length > 0) { if (retryCount > 0) { + isRetrying = true retryCount -= 1 + stats.retry += bulkBody.length setTimeout(tryBulk, wait, bulkBody, retryDocuments) return } @@ -397,7 +400,6 @@ class Helpers { // a document, because it was not an error in the document itself, // but the ES node were handling too many operations. if (status === 429) { - stats.retry += 1 retry.push(bulkBody[indexSlice]) if (operation !== 'delete') { retry.push(bulkBody[indexSlice + 1]) From 9f82d780bddcf841728ca502e246cfa56748a77e Mon Sep 17 00:00:00 2001 From: delvedor Date: Tue, 24 Mar 2020 10:57:43 +0100 Subject: [PATCH 2/2] Improve code coverage --- lib/Helpers.js | 1 + lib/Transport.js | 1 + test/unit/helpers/bulk.test.js | 123 +++++++++++++++++++++++++++++++ test/unit/helpers/scroll.test.js | 52 ++++++++++++- test/unit/helpers/search.test.js | 19 +++++ 5 files changed, 195 insertions(+), 1 deletion(-) diff --git a/lib/Helpers.js b/lib/Helpers.js index e72eec72a..8b16458cc 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -317,6 +317,7 @@ class Helpers { } function send (bulkBody) { + /* istanbul ignore if */ if (running >= concurrency) { throw new Error('Max concurrency reached') } diff --git a/lib/Transport.js b/lib/Transport.js index 7e1fe66fc..b20d3f0cb 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -49,6 +49,7 @@ class Transport { } else if (opts.nodeSelector === 'round-robin') { this.nodeSelector = roundRobinSelector() } else if (opts.nodeSelector === 'random') { + /* istanbul ignore next */ this.nodeSelector = randomSelector } else { this.nodeSelector = roundRobinSelector() diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index ee15eb103..bff3dd21f 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -143,6 +143,51 @@ test('bulk index', t => { }) }) + t.test('refreshOnCompletion', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (params.method === 'GET') { + t.strictEqual(params.path, '/_all/_refresh') + return { body: { acknowledged: true } } + } else { + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'Content-Type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + refreshOnCompletion: true, + onDocument (doc) { + return { + index: { _index: 'test' } + } + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + t.test('Should perform a bulk request (custom action)', async t => { let count = 0 const MockConnection = connection.buildMockConnection({ @@ -262,6 +307,55 @@ test('bulk index', t => { server.stop() }) + t.test('Should perform a bulk request (retry a single document from batch)', async t => { + function handler (req, res) { + res.setHeader('content-type', 'application/json') + res.end(JSON.stringify({ + took: 0, + errors: true, + items: [ + { index: { status: 200 } }, + { index: { status: 429 } }, + { index: { status: 200 } } + ] + })) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + concurrency: 1, + wait: 10, + retries: 0, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.deepEqual(doc, { + status: 429, + error: null, + operation: { index: { _index: 'test' } }, + document: { user: 'arya', age: 18 }, + retried: false + }) + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 2, + retry: 0, + failed: 1, + aborted: false + }) + server.stop() + }) + t.test('Should perform a bulk request (failure)', async t => { if (semver.lt(process.versions.node, '10.0.0')) { t.skip('This test will not pass on Node v8') @@ -475,6 +569,35 @@ test('bulk index', t => { server.stop() }) + t.test('Invalid operation', t => { + t.plan(2) + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + client.helpers + .bulk({ + datasource: dataset.slice(), + flushBytes: 1, + concurrency: 1, + onDocument (doc) { + return { + foo: { _index: 'test' } + } + } + }) + .catch(err => { + t.true(err instanceof errors.ConfigurationError) + t.is(err.message, `Bulk helper invalid action: 'foo'`) + }) + }) + t.end() }) diff --git a/test/unit/helpers/scroll.test.js b/test/unit/helpers/scroll.test.js index 660ff5b17..1ef1a6cb1 100644 --- a/test/unit/helpers/scroll.test.js +++ b/test/unit/helpers/scroll.test.js @@ -164,7 +164,8 @@ test('Scroll search (retry throws and maxRetries)', async t => { index: 'test', body: { foo: 'bar' } }, { - wait: 10 + wait: 10, + ignore: [404] }) try { @@ -178,6 +179,55 @@ test('Scroll search (retry throws and maxRetries)', async t => { } }) +test('Scroll search (retry throws later)', async t => { + var count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (count > 1) { + count += 1 + return { body: {}, statusCode: 429 } + } + return { + statusCode: 200, + body: { + _scroll_id: count === 4 ? undefined : 'id', + count, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const scrollSearch = client.helpers.scrollSearch({ + index: 'test', + body: { foo: 'bar' } + }, { + wait: 10 + }) + + try { + for await (const result of scrollSearch) { // eslint-disable-line + t.strictEqual(result.body.count, count) + count += 1 + } + } catch (err) { + t.true(err instanceof errors.ResponseError) + t.strictEqual(err.statusCode, 429) + t.strictEqual(count, 5) + } +}) + test('Scroll search documents', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ diff --git a/test/unit/helpers/search.test.js b/test/unit/helpers/search.test.js index 14473b56d..920f353dc 100644 --- a/test/unit/helpers/search.test.js +++ b/test/unit/helpers/search.test.js @@ -40,3 +40,22 @@ test('Search should have an additional documents property', async t => { { three: 'three' } ]) }) + +test('kGetHits fallback', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { body: {} } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.search({ + index: 'test', + body: { foo: 'bar' } + }) + t.deepEqual(result, []) +})