Skip to content

Commit 85616b0

Browse files
authored
Improve code coverage (#1124)
* Helpers: Fixed stats counting * Improve code coverage
1 parent f99fe71 commit 85616b0

File tree

5 files changed

+200
-4
lines changed

5 files changed

+200
-4
lines changed

lib/Helpers.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,12 @@ class Helpers {
229229
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
230230
bulkBody.push(actionBody)
231231
bulkBody.push(payloadBody)
232-
} else { // delete
232+
} else if (operation === 'delete') {
233233
actionBody = serialize(action)
234234
chunkBytes += Buffer.byteLength(actionBody)
235235
bulkBody.push(actionBody)
236+
} else {
237+
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
236238
}
237239

238240
if (chunkBytes >= flushBytes) {
@@ -315,6 +317,7 @@ class Helpers {
315317
}
316318

317319
function send (bulkBody) {
320+
/* istanbul ignore if */
318321
if (running >= concurrency) {
319322
throw new Error('Max concurrency reached')
320323
}
@@ -351,11 +354,12 @@ class Helpers {
351354
function retryDocuments (err, bulkBody) {
352355
if (err) return callback(err)
353356
if (shouldAbort === true) return callback()
354-
isRetrying = true
355357

356358
if (bulkBody.length > 0) {
357359
if (retryCount > 0) {
360+
isRetrying = true
358361
retryCount -= 1
362+
stats.retry += bulkBody.length
359363
setTimeout(tryBulk, wait, bulkBody, retryDocuments)
360364
return
361365
}
@@ -397,7 +401,6 @@ class Helpers {
397401
// a document, because it was not an error in the document itself,
398402
// but the ES node were handling too many operations.
399403
if (status === 429) {
400-
stats.retry += 1
401404
retry.push(bulkBody[indexSlice])
402405
if (operation !== 'delete') {
403406
retry.push(bulkBody[indexSlice + 1])

lib/Transport.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Transport {
4949
} else if (opts.nodeSelector === 'round-robin') {
5050
this.nodeSelector = roundRobinSelector()
5151
} else if (opts.nodeSelector === 'random') {
52+
/* istanbul ignore next */
5253
this.nodeSelector = randomSelector
5354
} else {
5455
this.nodeSelector = roundRobinSelector()

test/unit/helpers/bulk.test.js

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,51 @@ test('bulk index', t => {
143143
})
144144
})
145145

146+
t.test('refreshOnCompletion', async t => {
147+
let count = 0
148+
const MockConnection = connection.buildMockConnection({
149+
onRequest (params) {
150+
if (params.method === 'GET') {
151+
t.strictEqual(params.path, '/_all/_refresh')
152+
return { body: { acknowledged: true } }
153+
} else {
154+
t.strictEqual(params.path, '/_bulk')
155+
t.match(params.headers, { 'Content-Type': 'application/x-ndjson' })
156+
const [action, payload] = params.body.split('\n')
157+
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
158+
t.deepEqual(JSON.parse(payload), dataset[count++])
159+
return { body: { errors: false, items: [{}] } }
160+
}
161+
}
162+
})
163+
164+
const client = new Client({
165+
node: 'http://localhost:9200',
166+
Connection: MockConnection
167+
})
168+
const result = await client.helpers.bulk({
169+
datasource: dataset.slice(),
170+
flushBytes: 1,
171+
concurrency: 1,
172+
refreshOnCompletion: true,
173+
onDocument (doc) {
174+
return {
175+
index: { _index: 'test' }
176+
}
177+
}
178+
})
179+
180+
t.type(result.time, 'number')
181+
t.type(result.bytes, 'number')
182+
t.match(result, {
183+
total: 3,
184+
successful: 3,
185+
retry: 0,
186+
failed: 0,
187+
aborted: false
188+
})
189+
})
190+
146191
t.test('Should perform a bulk request (custom action)', async t => {
147192
let count = 0
148193
const MockConnection = connection.buildMockConnection({
@@ -262,6 +307,55 @@ test('bulk index', t => {
262307
server.stop()
263308
})
264309

310+
t.test('Should perform a bulk request (retry a single document from batch)', async t => {
311+
function handler (req, res) {
312+
res.setHeader('content-type', 'application/json')
313+
res.end(JSON.stringify({
314+
took: 0,
315+
errors: true,
316+
items: [
317+
{ index: { status: 200 } },
318+
{ index: { status: 429 } },
319+
{ index: { status: 200 } }
320+
]
321+
}))
322+
}
323+
324+
const [{ port }, server] = await buildServer(handler)
325+
const client = new Client({ node: `http://localhost:${port}` })
326+
const result = await client.helpers.bulk({
327+
datasource: dataset.slice(),
328+
concurrency: 1,
329+
wait: 10,
330+
retries: 0,
331+
onDocument (doc) {
332+
return {
333+
index: { _index: 'test' }
334+
}
335+
},
336+
onDrop (doc) {
337+
t.deepEqual(doc, {
338+
status: 429,
339+
error: null,
340+
operation: { index: { _index: 'test' } },
341+
document: { user: 'arya', age: 18 },
342+
retried: false
343+
})
344+
}
345+
})
346+
347+
t.type(result.time, 'number')
348+
t.type(result.bytes, 'number')
349+
t.match(result, {
350+
total: 3,
351+
successful: 2,
352+
retry: 0,
353+
failed: 1,
354+
aborted: false
355+
})
356+
server.stop()
357+
})
358+
265359
t.test('Should perform a bulk request (failure)', async t => {
266360
if (semver.lt(process.versions.node, '10.0.0')) {
267361
t.skip('This test will not pass on Node v8')
@@ -475,6 +569,35 @@ test('bulk index', t => {
475569
server.stop()
476570
})
477571

572+
t.test('Invalid operation', t => {
573+
t.plan(2)
574+
const MockConnection = connection.buildMockConnection({
575+
onRequest (params) {
576+
return { body: { errors: false, items: [{}] } }
577+
}
578+
})
579+
580+
const client = new Client({
581+
node: 'http://localhost:9200',
582+
Connection: MockConnection
583+
})
584+
client.helpers
585+
.bulk({
586+
datasource: dataset.slice(),
587+
flushBytes: 1,
588+
concurrency: 1,
589+
onDocument (doc) {
590+
return {
591+
foo: { _index: 'test' }
592+
}
593+
}
594+
})
595+
.catch(err => {
596+
t.true(err instanceof errors.ConfigurationError)
597+
t.is(err.message, `Bulk helper invalid action: 'foo'`)
598+
})
599+
})
600+
478601
t.end()
479602
})
480603

test/unit/helpers/scroll.test.js

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ test('Scroll search (retry throws and maxRetries)', async t => {
164164
index: 'test',
165165
body: { foo: 'bar' }
166166
}, {
167-
wait: 10
167+
wait: 10,
168+
ignore: [404]
168169
})
169170

170171
try {
@@ -178,6 +179,55 @@ test('Scroll search (retry throws and maxRetries)', async t => {
178179
}
179180
})
180181

182+
test('Scroll search (retry throws later)', async t => {
183+
var count = 0
184+
const MockConnection = connection.buildMockConnection({
185+
onRequest (params) {
186+
if (count > 1) {
187+
count += 1
188+
return { body: {}, statusCode: 429 }
189+
}
190+
return {
191+
statusCode: 200,
192+
body: {
193+
_scroll_id: count === 4 ? undefined : 'id',
194+
count,
195+
hits: {
196+
hits: [
197+
{ _source: { one: 'one' } },
198+
{ _source: { two: 'two' } },
199+
{ _source: { three: 'three' } }
200+
]
201+
}
202+
}
203+
}
204+
}
205+
})
206+
207+
const client = new Client({
208+
node: 'http://localhost:9200',
209+
Connection: MockConnection
210+
})
211+
212+
const scrollSearch = client.helpers.scrollSearch({
213+
index: 'test',
214+
body: { foo: 'bar' }
215+
}, {
216+
wait: 10
217+
})
218+
219+
try {
220+
for await (const result of scrollSearch) { // eslint-disable-line
221+
t.strictEqual(result.body.count, count)
222+
count += 1
223+
}
224+
} catch (err) {
225+
t.true(err instanceof errors.ResponseError)
226+
t.strictEqual(err.statusCode, 429)
227+
t.strictEqual(count, 5)
228+
}
229+
})
230+
181231
test('Scroll search documents', async t => {
182232
var count = 0
183233
const MockConnection = connection.buildMockConnection({

test/unit/helpers/search.test.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,22 @@ test('Search should have an additional documents property', async t => {
4040
{ three: 'three' }
4141
])
4242
})
43+
44+
test('kGetHits fallback', async t => {
45+
const MockConnection = connection.buildMockConnection({
46+
onRequest (params) {
47+
return { body: {} }
48+
}
49+
})
50+
51+
const client = new Client({
52+
node: 'http://localhost:9200',
53+
Connection: MockConnection
54+
})
55+
56+
const result = await client.helpers.search({
57+
index: 'test',
58+
body: { foo: 'bar' }
59+
})
60+
t.deepEqual(result, [])
61+
})

0 commit comments

Comments
 (0)