From ca66c9cee79a64a143aeeea00dfb6c9b7bd8d495 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Fri, 20 Oct 2023 12:03:54 -0400 Subject: [PATCH] fix: improved organization sync --- .jshintrc | 3 + src/datalayer/persistance.js | 18 +-- src/datalayer/syncService.js | 78 ++++++++---- .../organizations/organizations.model.js | 118 ++++++++++-------- 4 files changed, 129 insertions(+), 88 deletions(-) create mode 100644 .jshintrc diff --git a/.jshintrc b/.jshintrc new file mode 100644 index 00000000..711f4c4f --- /dev/null +++ b/.jshintrc @@ -0,0 +1,3 @@ +{ + "esversion": 11 +} diff --git a/src/datalayer/persistance.js b/src/datalayer/persistance.js index 5e25fcab..99272bb9 100644 --- a/src/datalayer/persistance.js +++ b/src/datalayer/persistance.js @@ -302,12 +302,17 @@ const getStoreData = async (storeId, rootHash) => { } return data; } + + logger.error( + `FAILED GETTING STORE DATA FOR ${storeId}: ${JSON.stringify(data)}`, + ); } catch (error) { logger.info( `Unable to find store data for ${storeId} at root ${ rootHash || 'latest' }`, ); + logger.error(error.message); return false; } } @@ -330,20 +335,15 @@ const getRoot = async (storeId, ignoreEmptyStore = false) => { .timeout(timeout) .send({ id: storeId }); - const data = response.body; + const { confirmed, hash } = response.body; - if ( - (data.confirmed && !ignoreEmptyStore) || - (data.confirmed && - ignoreEmptyStore && - !data.hash.includes('0x00000000000')) - ) { - return data; + if (confirmed && (!ignoreEmptyStore || !hash.includes('0x00000000000'))) { + return response.body; } return false; } catch (error) { - logger.error(error); + logger.error(error.message); return false; } }; diff --git a/src/datalayer/syncService.js b/src/datalayer/syncService.js index 42081bcd..9c9f49dc 100644 --- a/src/datalayer/syncService.js +++ b/src/datalayer/syncService.js @@ -287,23 +287,39 @@ const getRootDiff = (storeId, root1, root2) => { } }; -const getStoreData = async (storeId, callback, onFail, retry = 0) => { +/** + * Fetches store data and invokes either a callback or an error handler. + * + * @param {string} storeId - The ID of the store to fetch data for. + * @param {Function} callback - Function to call on successful data retrieval. + * @param {Function} onFail - Function to call when data retrieval fails. + * @param {number} retry - Number of retry attempts. + */ +const getStoreData = async (storeId, callback, onFail, rootHash, retry = 0) => { + const MAX_RETRIES = 50; + const RETRY_DELAY = 120000; + try { logger.info(`Getting store data, retry: ${retry}`); - if (retry <= 10) { - const encodedData = await dataLayer.getStoreData(storeId); - if (_.isEmpty(encodedData?.keys_values)) { - await new Promise((resolve) => setTimeout(() => resolve(), 120000)); - return getStoreData(storeId, callback, onFail, retry + 1); - } else { - callback(decodeDataLayerResponse(encodedData)); - } - } else { - onFail(); + + if (retry > MAX_RETRIES) { + return onFail(`Max retries exceeded for store ${storeId}`); } + + const encodedData = await dataLayer.getStoreData(storeId, rootHash); + + if (!encodedData || _.isEmpty(encodedData?.keys_values)) { + logger.debug(`No data found for store ${storeId}, retrying...`); + await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY)); + return getStoreData(storeId, callback, onFail, rootHash, retry + 1); + } + + const decodedData = decodeDataLayerResponse(encodedData); + + callback(decodedData); } catch (error) { logger.error(error.message); - onFail(); + onFail(error.message); } }; @@ -320,18 +336,32 @@ const getCurrentStoreData = async (storeId) => { } }; -const getStoreIfUpdated = async ( - storeId, - lastRootHash, - onUpdate, - callback, - onFail, -) => { - const rootResponse = await dataLayer.getRoot(storeId); - if (rootResponse.confirmed && rootResponse.hash !== lastRootHash) { - logger.debug(`Updating orgUid ${storeId} with hash ${rootResponse.hash}`); - onUpdate(rootResponse.hash); - await getStoreData(storeId, callback, onFail); +/** + * Checks if the store data has been updated and triggers the appropriate callbacks. + * + * @param {string} storeId - The ID of the store to check. + * @param {string} lastRootHash - The last known root hash for comparison. + * @param {function} callback - Callback to invoke to process the store data. + * @param {function} onFail - Callback to invoke if an operation fails. + */ +const getStoreIfUpdated = async (storeId, lastRootHash, callback, onFail) => { + try { + const rootResponse = await dataLayer.getRoot(storeId); + + if (rootResponse.confirmed && rootResponse.hash !== lastRootHash) { + const curriedCallback = (data) => callback(rootResponse.hash, data); + + await getStoreData( + storeId, + curriedCallback, + onFail, + rootResponse.hash, + 0, + ); + } + } catch (error) { + logger.error(error.message); + onFail(error.message); } }; diff --git a/src/models/organizations/organizations.model.js b/src/models/organizations/organizations.model.js index 79b0a3f7..88d7f02e 100644 --- a/src/models/organizations/organizations.model.js +++ b/src/models/organizations/organizations.model.js @@ -256,8 +256,7 @@ class Organization extends Model { }); } - // eslint-disable-next-line - static importOrganization = async (orgUid) => { + static async importOrganization(orgUid) { try { console.log('Importing organization ' + orgUid); const orgData = await datalayer.getSubscribedStoreData(orgUid); @@ -310,10 +309,9 @@ class Organization extends Model { } catch (error) { logger.info(error.message); } - }; + } - // eslint-disable-next-line - static subscribeToOrganization = async (orgUid) => { + static async subscribeToOrganization(orgUid) { const exists = await Organization.findOne({ where: { orgUid } }); if (exists) { await Organization.update({ subscribed: true }, { where: { orgUid } }); @@ -322,65 +320,75 @@ class Organization extends Model { 'Can not subscribe, please import this organization first', ); } - }; + } - // eslint-disable-next-line - static unsubscribeToOrganization = async (orgUid) => { + static async unsubscribeToOrganization(orgUid) { await Organization.update({ subscribed: false }, { orgUid }); - }; + } - static syncOrganizationMeta = async () => { + /** + * Synchronizes metadata for all subscribed organizations. + */ + static async syncOrganizationMeta() { try { const allSubscribedOrganizations = await Organization.findAll({ subscribed: true, }); await Promise.all( - allSubscribedOrganizations.map((organization) => { - const onResult = (data) => { - const updateData = data - .filter((pair) => !pair.key.includes('meta_')) - .reduce((update, current) => { - update[current.key] = current.value; - return update; - }, {}); - - // will return metadata fields. i.e.: { meta_key1: 'value1', meta_key2: 'value2' } - const metadata = data - .filter((pair) => pair.key.includes('meta_')) - .reduce((update, current) => { - update[current.key] = current.value; - return update; - }, {}); - - Organization.update( - { - ..._.omit(updateData, ['registryId']), - metadata: JSON.stringify(metadata), - }, - { - where: { orgUid: organization.orgUid }, - }, - ); - }; - - const onUpdate = (updateHash) => { + allSubscribedOrganizations.map(async (organization) => { + const processData = (data, keyFilter) => + data + .filter(({ key }) => keyFilter(key)) + .reduce( + (update, { key, value }) => ({ ...update, [key]: value }), + {}, + ); + + const onFail = (message) => { + logger.info(`Unable to sync metadata from ${organization.orgUid}`); + logger.error(`ORGANIZATION DATA SYNC ERROR: ${message}`); Organization.update( - { orgHash: updateHash }, - { - where: { orgUid: organization.orgUid }, - }, + { orgHash: '0' }, + { where: { orgUid: organization.orgUid } }, ); }; - const onFail = () => { - logger.info(`Unable to sync metadata from ${organization.orgUid}`); + const onResult = async (updateHash, data) => { + try { + const updateData = processData( + data, + (key) => !key.includes('meta_'), + ); + const metadata = processData(data, (key) => + key.includes('meta_'), + ); + + await Organization.update( + { + ..._.omit(updateData, ['registryId']), + prefix: updateData.prefix || '0', + metadata: JSON.stringify(metadata), + }, + { where: { orgUid: organization.orgUid } }, + ); + + logger.debug( + `Updating orgUid ${organization.orgUid} with hash ${updateHash}`, + ); + await Organization.update( + { orgHash: updateHash }, + { where: { orgUid: organization.orgUid } }, + ); + } catch (error) { + logger.info(error.message); + onFail(error.message); + } }; datalayer.getStoreIfUpdated( organization.orgUid, organization.orgHash, - onUpdate, onResult, onFail, ); @@ -389,9 +397,9 @@ class Organization extends Model { } catch (error) { logger.info(error.message); } - }; + } - static subscribeToDefaultOrganizations = async () => { + static async subscribeToDefaultOrganizations() { try { const defaultOrgs = await getDefaultOrganizationList(); if (!Array.isArray(defaultOrgs)) { @@ -414,9 +422,9 @@ class Organization extends Model { } catch (error) { logger.info(error); } - }; + } - static editOrgMeta = async ({ name, icon }) => { + static async editOrgMeta({ name, icon }) { const myOrganization = await Organization.getHomeOrg(); const payload = {}; @@ -430,20 +438,20 @@ class Organization extends Model { } await datalayer.upsertDataLayer(myOrganization.orgUid, payload); - }; + } - static addMetadata = async (payload) => { + static async addMetadata(payload) { const myOrganization = await Organization.getHomeOrg(); // Prefix keys with "meta_" const metadata = _.mapKeys(payload, (_value, key) => `meta_${key}`); await datalayer.upsertDataLayer(myOrganization.orgUid, metadata); - }; + } - static removeMirror = async (storeId, coinId) => { + static async removeMirror(storeId, coinId) { datalayer.removeMirror(storeId, coinId); - }; + } } Organization.init(ModelTypes, {