Permalink
Browse files

Merge pull request #175 from bookbrainz/improve-search-indexing

Bump the bulk operation size to try to reduce the chances of silent indexing failure. Only index the master revision of entities for which the master revision includes entity data.
  • Loading branch information...
LordSputnik committed Dec 31, 2017
2 parents e613c4c + 71d7966 commit 3c9bf863142faaf70387e236bf390478297e050c
Showing with 69 additions and 21 deletions.
  1. +4 −4 config/config.json.example
  2. +65 −17 src/server/helpers/search.js
@@ -27,9 +27,9 @@
}
},
"search": {
"host": "localhost:9200",
"apiVersion": "5.5",
"maxRetries": -1,
"deadTimeout": 2000
"host": "localhost:9200",
"apiVersion": "5.5",
"maxRetries": -1,
"deadTimeout": 2000
}
}
@@ -20,10 +20,15 @@ import * as utils from '../helpers/utils';
import ElasticSearch from 'elasticsearch';
import Promise from 'bluebird';
import _ from 'lodash';
import httpStatus from 'http-status';
const _index = 'bookbrainz';
const _bulkIndexSize = 40;
const _bulkIndexSize = 128;
// In milliseconds
const _retryDelay = 10;
const _maxJitter = 75;
let _client = null;
@@ -68,22 +73,60 @@ async function _bulkIndexEntities(entities) {
return;
}
const bulkOperations = entities.reduce((accumulator, entity) => {
accumulator.push({
index: {
_id: entity.bbid,
_index,
_type: entity.type.toLowerCase()
}
// Proxy the list of entities to index in case we need to retry
let entitiesToIndex = entities;
let operationSucceeded = false;
while (!operationSucceeded) {
const bulkOperations = entitiesToIndex.reduce((accumulator, entity) => {
accumulator.push({
index: {
_id: entity.bbid,
_index,
_type: entity.type.toLowerCase()
}
});
accumulator.push(entity);
return accumulator;
}, []);
operationSucceeded = true;
// eslint-disable-next-line no-await-in-loop
const response = await _client.bulk({
body: bulkOperations
});
accumulator.push(entity);
return accumulator;
}, []);
/*
* In case of failed index operations, the promise won't be rejected;
* instead, we have to inspect the response and respond to any failures
* individually.
*/
if (response.errors === true) {
entitiesToIndex = response.items.reduce((accumulator, item) => {
// We currently only handle queue overrun
if (item.index.status === httpStatus.TOO_MANY_REQUESTS) {
const failedEntity = entities.find(
(element) => element.bbid === item.index._id
);
accumulator.push(failedEntity);
}
await _client.bulk({
body: bulkOperations
});
return accumulator;
}, []);
if (entitiesToIndex.length > 0) {
operationSucceeded = false;
const jitter = Math.random() * _maxJitter;
// eslint-disable-next-line no-await-in-loop
await Promise.delay(_retryDelay + jitter);
}
}
}
}
async function _processEntityListForBulk(entityList) {
@@ -260,9 +303,14 @@ export async function generateIndex(orm) {
// Update the indexed entries for each entity type
const behaviorPromise = entityBehaviors.map(
(behavior) => behavior.model.forge().fetchAll({
withRelated: baseRelations.concat(behavior.relations)
})
(behavior) => behavior.model.forge()
.query((qb) => {
qb.where('master', true);
qb.whereNotNull('data_id');
})
.fetchAll({
withRelated: baseRelations.concat(behavior.relations)
})
);
const entityLists = await Promise.all(behaviorPromise);

0 comments on commit 3c9bf86

Please sign in to comment.