Skip to content

Commit

Permalink
Merge pull request #175 from bookbrainz/improve-search-indexing
Browse files Browse the repository at this point in the history
Bump the bulk operation size to try to reduce the chances of silent indexing failure. Only index the master revision of entities for which the master revision includes entity data.
  • Loading branch information
LordSputnik committed Dec 31, 2017
2 parents e613c4c + 71d7966 commit 3c9bf86
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 21 deletions.
8 changes: 4 additions & 4 deletions config/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
}
},
"search": {
"host": "localhost:9200",
"apiVersion": "5.5",
"maxRetries": -1,
"deadTimeout": 2000
"host": "localhost:9200",
"apiVersion": "5.5",
"maxRetries": -1,
"deadTimeout": 2000
}
}
82 changes: 65 additions & 17 deletions src/server/helpers/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ import * as utils from '../helpers/utils';
import ElasticSearch from 'elasticsearch';
import Promise from 'bluebird';
import _ from 'lodash';
import httpStatus from 'http-status';


const _index = 'bookbrainz';
const _bulkIndexSize = 40;
const _bulkIndexSize = 128;

// In milliseconds
const _retryDelay = 10;
const _maxJitter = 75;

let _client = null;

Expand Down Expand Up @@ -68,22 +73,60 @@ async function _bulkIndexEntities(entities) {
return;
}

const bulkOperations = entities.reduce((accumulator, entity) => {
accumulator.push({
index: {
_id: entity.bbid,
_index,
_type: entity.type.toLowerCase()
}
// Proxy the list of entities to index in case we need to retry
let entitiesToIndex = entities;

let operationSucceeded = false;
while (!operationSucceeded) {
const bulkOperations = entitiesToIndex.reduce((accumulator, entity) => {
accumulator.push({
index: {
_id: entity.bbid,
_index,
_type: entity.type.toLowerCase()
}
});
accumulator.push(entity);

return accumulator;
}, []);

operationSucceeded = true;

// eslint-disable-next-line no-await-in-loop
const response = await _client.bulk({
body: bulkOperations
});
accumulator.push(entity);

return accumulator;
}, []);
/*
* In case of failed index operations, the promise won't be rejected;
* instead, we have to inspect the response and respond to any failures
* individually.
*/
if (response.errors === true) {
entitiesToIndex = response.items.reduce((accumulator, item) => {
// We currently only handle queue overrun
if (item.index.status === httpStatus.TOO_MANY_REQUESTS) {
const failedEntity = entities.find(
(element) => element.bbid === item.index._id
);

accumulator.push(failedEntity);
}

await _client.bulk({
body: bulkOperations
});
return accumulator;
}, []);


if (entitiesToIndex.length > 0) {
operationSucceeded = false;

const jitter = Math.random() * _maxJitter;
// eslint-disable-next-line no-await-in-loop
await Promise.delay(_retryDelay + jitter);
}
}
}
}

async function _processEntityListForBulk(entityList) {
Expand Down Expand Up @@ -260,9 +303,14 @@ export async function generateIndex(orm) {

// Update the indexed entries for each entity type
const behaviorPromise = entityBehaviors.map(
(behavior) => behavior.model.forge().fetchAll({
withRelated: baseRelations.concat(behavior.relations)
})
(behavior) => behavior.model.forge()
.query((qb) => {
qb.where('master', true);
qb.whereNotNull('data_id');
})
.fetchAll({
withRelated: baseRelations.concat(behavior.relations)
})
);
const entityLists = await Promise.all(behaviorPromise);

Expand Down

0 comments on commit 3c9bf86

Please sign in to comment.