From 3a0ca23b390367d129c220dbdcc4b53fefbd667e Mon Sep 17 00:00:00 2001 From: Johnson Liang Date: Thu, 18 Nov 2021 13:59:29 +0800 Subject: [PATCH] [util/getAllDocs] extract getAllDocs to util --- src/scripts/migrations/createBackendUsers.js | 32 ++++---------- src/util/getAllDocs.js | 45 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 23 deletions(-) create mode 100644 src/util/getAllDocs.js diff --git a/src/scripts/migrations/createBackendUsers.js b/src/scripts/migrations/createBackendUsers.js index ad09c226..ff431f35 100644 --- a/src/scripts/migrations/createBackendUsers.js +++ b/src/scripts/migrations/createBackendUsers.js @@ -34,6 +34,7 @@ import 'dotenv/config'; import client from 'util/client'; +import getAllDocs from 'util/getAllDocs'; import Bulk from 'util/bulk'; import rollbar from 'rollbarInstance'; import { @@ -295,30 +296,15 @@ export default class CreateBackendUsers { * @yields {Object} the document */ async *getAllDocs(indexName, hasAdditionalUserFields = false) { - let resp = await client.search({ - index: indexName, - scroll: SCROLL_TIMEOUT, - size: this.batchSize, - body: { - query: hasAdditionalUserFields ? matchAllQuery : backendUserQuery, - }, - }); - - while (true) { - const docs = resp.body.hits.hits; - if (docs.length === 0) break; - for (const doc of docs) { - yield doc; - } - - if (!resp.body._scroll_id) { - break; - } - - resp = await client.scroll({ + for await (const doc of getAllDocs( + indexName, + hasAdditionalUserFields ? matchAllQuery : backendUserQuery, + { scroll: SCROLL_TIMEOUT, - scrollId: resp.body._scroll_id, - }); + size: this.batchSize, + } + )) { + yield doc; } } diff --git a/src/util/getAllDocs.js b/src/util/getAllDocs.js new file mode 100644 index 00000000..dcf731c6 --- /dev/null +++ b/src/util/getAllDocs.js @@ -0,0 +1,45 @@ +import client from 'util/client'; + +/** + * A generator that fetches all docs in the specified index and yield 1 doc at a time. + * + * @param {String} index The name of the index to fetch + * @param {Object} query The elasticsearch query. Fetches all doc in the specified index if not given. + * @param {Object} settings Query param settings in ES. + * @param {number} settings.size Number of docs per batch. Default to 1000. + * @param {string} settings.scroll The scroll timeout setting. Default to 30s. + * @yields {Object} the document + */ +async function* getAllDocs( + index, + query = { match_all: {} }, + { scroll = '30s', size = 1000 } = {} +) { + let resp = await client.search({ + index, + scroll, + size, + body: { + query, + }, + }); + + while (true) { + const docs = resp.body.hits.hits; + if (docs.length === 0) break; + for (const doc of docs) { + yield doc; + } + + if (!resp.body._scroll_id) { + break; + } + + resp = await client.scroll({ + scroll, + scrollId: resp.body._scroll_id, + }); + } +} + +export default getAllDocs;