From b2014384db3eb3dcf21f1ed022f1a061e3875ff2 Mon Sep 17 00:00:00 2001 From: "vicky :)" <60366641+vicky-g@users.noreply.github.com> Date: Mon, 19 Sep 2022 23:50:03 -0400 Subject: [PATCH] More grafana granularity and track sync failure per wallet (#3886) * add fetching replica set err * update err response * remove trailing period * throw not just return err * add err obj --- creator-node/src/middlewares.js | 2 +- .../prometheus.constants.ts | 2 + .../services/sync/primarySyncFromSecondary.js | 69 ++++++---- .../services/sync/secondarySyncFromPrimary.js | 128 ++++++++++++------ 4 files changed, 136 insertions(+), 65 deletions(-) diff --git a/creator-node/src/middlewares.js b/creator-node/src/middlewares.js index 56c947031ed..c715c997b4b 100644 --- a/creator-node/src/middlewares.js +++ b/creator-node/src/middlewares.js @@ -697,7 +697,7 @@ async function getUserReplicaSetEndpointsFromDiscovery({ !user[0].hasOwnProperty('creator_node_endpoint') ) { throw new Error( - `Invalid return data from discovery provider for user with wallet ${wallet}.` + `Invalid return data from discovery provider for user with wallet ${wallet}` ) } diff --git a/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts b/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts index 31e38ebddbc..a223e5be519 100644 --- a/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts +++ b/creator-node/src/services/prometheusMonitoring/prometheus.constants.ts @@ -89,6 +89,7 @@ export const METRIC_LABELS = Object.freeze({ 'abort_current_node_is_not_user_secondary', 'abort_sync_in_progress', 'abort_force_wipe_disabled', + 'failure_fetching_user_replica_set', 'failure_force_resync_check', 'failure_fetching_user_gateway', 'failure_delete_db_data', @@ -121,6 +122,7 @@ export const METRIC_LABELS = Object.freeze({ 'abort_multiple_users_returned_from_export', 'abort_missing_user_export_key_fields', 'abort_mismatched_export_wallet', + 'failure_fetching_user_replica_set', 'failure_content_node_endpoint_not_initialized', 'failure_audius_libs_not_initialized', 'failure_export_wallet', diff --git a/creator-node/src/services/sync/primarySyncFromSecondary.js b/creator-node/src/services/sync/primarySyncFromSecondary.js index 7256b5e8b38..f33e4b883fd 100644 --- a/creator-node/src/services/sync/primarySyncFromSecondary.js +++ b/creator-node/src/services/sync/primarySyncFromSecondary.js @@ -47,19 +47,19 @@ async function _primarySyncFromSecondary({ }) decisionTree.recordStage({ name: 'Begin', log: true }) - let result + let errorResult, error try { const selfEndpoint = config.get('creatorNodeEndpoint') if (!selfEndpoint) { decisionTree.recordStage({ name: 'selfEndpoint missing', log: false }) - result = { + errorResult = { error: 'Content node endpoint not set on node', result: 'failure_content_node_endpoint_not_initialized' } - throw new Error(result.error) + throw new Error(errorResult.error) } let libs @@ -74,12 +74,12 @@ async function _primarySyncFromSecondary({ data: { errorMsg: e.message } }) - result = { + errorResult = { error: `Could not initialize audiusLibs: ${e.message}`, result: 'failure_audius_libs_not_initialized' } - throw new Error(result.error) + throw new Error(errorResult.error) } await WalletWriteLock.acquire( @@ -88,14 +88,29 @@ async function _primarySyncFromSecondary({ ) // TODO should be able to pass this through from StateMachine / caller - const userReplicaSet = await getUserReplicaSetEndpointsFromDiscovery({ - libs, - logger, - wallet, - blockNumber: null, - ensurePrimary: false + let userReplicaSet + try { + userReplicaSet = await getUserReplicaSetEndpointsFromDiscovery({ + libs, + logger, + wallet, + blockNumber: null, + ensurePrimary: false + }) + } catch (e) { + error = `Error fetching user replica set: ${e.message}` + errorResult = { + error, + result: 'failure_fetching_user_replica_set' + } + + throw new Error(error) + } + + decisionTree.recordStage({ + name: 'getUserReplicaSetEndpointsFromDiscovery() success', + log: true }) - decisionTree.recordStage({ name: 'getUserReplicaSet() success', log: true }) // Abort if this node is not primary for user if (userReplicaSet.primary !== selfEndpoint) { @@ -126,7 +141,11 @@ async function _primarySyncFromSecondary({ log: true }) - const { fetchedCNodeUser, error, abort } = await fetchExportFromNode({ + const { + fetchedCNodeUser, + error: fetchExportFromNodeError, + abort + } = await fetchExportFromNode({ nodeEndpointToFetchFrom: secondary, wallet, clockRangeMin: exportClockRangeMin, @@ -135,18 +154,18 @@ async function _primarySyncFromSecondary({ forceExport: true }) - if (error) { + if (fetchExportFromNodeError) { decisionTree.recordStage({ name: 'fetchExportFromSecondary() Error', - data: { error: error.message } + data: { error: fetchExportFromNodeError.message } }) - result = { - error: error.message, - result: error.code + errorResult = { + error: fetchExportFromNodeError.message, + result: fetchExportFromNodeError.code } - throw new Error(result.error) + throw new Error(errorResult.error) } if (abort) { @@ -204,12 +223,12 @@ async function _primarySyncFromSecondary({ data: { errorMsg: e.message } }) - result = { + errorResult = { error: `Error - Failed to save files to disk: ${e.message}`, result: 'failure_save_files_to_disk' } - throw new Error(result.error) + throw new Error(errorResult.error) } // Save all entries from export to DB @@ -230,12 +249,12 @@ async function _primarySyncFromSecondary({ data: { errorMsg: e.message } }) - result = { + errorResult = { error: `Error - Failed to save entries to DB: ${e.message}`, result: 'failure_save_entries_to_db' } - throw new Error(result.error) + throw new Error(errorResult.error) } /** @@ -274,8 +293,8 @@ async function _primarySyncFromSecondary({ tracing.recordException(e) await SyncHistoryAggregator.recordSyncFail(wallet) - if (result) { - return result + if (errorResult) { + return errorResult } // If no error was caught above, then return generic error diff --git a/creator-node/src/services/sync/secondarySyncFromPrimary.js b/creator-node/src/services/sync/secondarySyncFromPrimary.js index 7acc335ef3d..96c700c7046 100644 --- a/creator-node/src/services/sync/secondarySyncFromPrimary.js +++ b/creator-node/src/services/sync/secondarySyncFromPrimary.js @@ -32,6 +32,7 @@ const handleSyncFromPrimary = async ({ const logger = secondarySyncFromPrimaryLogger + let errorResponse, error try { try { await redis.WalletWriteLock.acquire( @@ -47,13 +48,25 @@ const handleSyncFromPrimary = async ({ } // Ensure this node is syncing from the user's primary - const userReplicaSet = await getUserReplicaSetEndpointsFromDiscovery({ - libs, - logger, - wallet, - blockNumber: null, - ensurePrimary: false - }) + let userReplicaSet + try { + userReplicaSet = await getUserReplicaSetEndpointsFromDiscovery({ + libs, + logger, + wallet, + blockNumber: null, + ensurePrimary: false + }) + } catch (e) { + error = new Error(`Error fetching user replica set: ${e.message}`) + errorResponse = { + error, + result: 'failure_fetching_user_replica_set' + } + + throw error + } + if (userReplicaSet.primary !== creatorNodeEndpoint) { return { abort: `Node being synced from is not primary. Node being synced from: ${creatorNodeEndpoint} Primary: ${userReplicaSet.primary}`, @@ -70,12 +83,15 @@ const handleSyncFromPrimary = async ({ ) const forceResyncQueryParam = forceResyncConfig?.forceResync if (forceResyncQueryParam && !forceResync) { - return { - error: new Error( - `Cannot issue sync for wallet ${wallet} due to shouldForceResync() rejection` - ), + error = new Error( + `Cannot issue sync for wallet ${wallet} due to shouldForceResync() rejection` + ) + errorResponse = { + error, result: 'failure_force_resync_check' } + + throw error } let localMaxClockVal @@ -116,10 +132,13 @@ const handleSyncFromPrimary = async ({ } if (deleteError) { - return { - error: deleteError, + error = deleteError + errorResponse = { + error, result: 'failure_delete_db_data' } + + throw error } if (forceWipe) { @@ -153,7 +172,11 @@ const handleSyncFromPrimary = async ({ * Secondary requests export of new data by passing its current max clock value in the request. * Primary builds an export object of all data beginning from the next clock value. */ - const { fetchedCNodeUser, error, abort } = await fetchExportFromNode({ + const { + fetchedCNodeUser, + error: fetchExportFromNodeErrorMessage, + abort + } = await fetchExportFromNode({ nodeEndpointToFetchFrom: creatorNodeEndpoint, wallet, clockRangeMin: localMaxClockVal + 1, @@ -161,11 +184,14 @@ const handleSyncFromPrimary = async ({ logger }) - if (error) { - return { - error: new Error(error.message), + if (fetchExportFromNodeErrorMessage) { + error = new Error(fetchExportFromNodeErrorMessage) + errorResponse = { + error, result: error.code } + + throw error } if (abort) { @@ -214,12 +240,15 @@ const handleSyncFromPrimary = async ({ `Couldn't filter out own endpoint from user's replica set to use as cnode gateways in saveFileForMultihashToFS - ${e.message}` ) - return { - error: new Error( - `Couldn't filter out own endpoint from user's replica set to use as cnode gateways in saveFileForMultihashToFS - ${e.message}` - ), + error = new Error( + `Couldn't filter out own endpoint from user's replica set to use as cnode gateways in saveFileForMultihashToFS - ${e.message}` + ) + errorResponse = { + error, result: 'failure_fetching_user_gateway' } + + throw error } /** @@ -233,12 +262,16 @@ const handleSyncFromPrimary = async ({ // Error if returned data is not within requested range if (fetchedLatestClockVal < localMaxClockVal) { - return { - error: new Error( - `Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}` - ), + error = new Error( + `Cannot sync for localMaxClockVal ${localMaxClockVal} - imported data has max clock val ${fetchedLatestClockVal}` + ) + + errorResponse = { + error, result: 'failure_inconsistent_clock' } + + throw error } if ( @@ -246,24 +279,30 @@ const handleSyncFromPrimary = async ({ fetchedClockRecords[0] && fetchedClockRecords[0].clock !== localMaxClockVal + 1 ) { - return { - error: new Error( - `Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}` - ), + error = new Error( + `Cannot sync - imported data is not contiguous. Local max clock val = ${localMaxClockVal} and imported min clock val ${fetchedClockRecords[0].clock}` + ) + errorResponse = { + error, result: 'failure_import_not_contiguous' } + + throw error } if ( !_.isEmpty(fetchedClockRecords) && maxClockRecordId !== fetchedLatestClockVal ) { - return { - error: new Error( - `Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}` - ), + error = new Error( + `Cannot sync - imported data is not consistent. Imported max clock val = ${fetchedLatestClockVal} and imported max ClockRecord val ${maxClockRecordId}` + ) + errorResponse = { + error, result: 'failure_import_not_consistent' } + + throw error } // All DB updates must happen in single atomic tx - partial state updates will lead to data loss @@ -323,12 +362,15 @@ const handleSyncFromPrimary = async ({ // Error if update failed if (numRowsUpdated !== 1 || respObj.length !== 1) { - return { - error: new Error( - `Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}` - ), + error = new Error( + `Failed to update cnodeUser row for cnodeUser wallet ${fetchedWalletPublicKey}` + ) + errorResponse = { + error, result: 'failure_db_transaction' } + + throw error } cnodeUser = respObj[0] @@ -560,10 +602,13 @@ const handleSyncFromPrimary = async ({ ) } - return { - error: e, + error = e + errorResponse = { + error, result: 'failure_db_transaction' } + + throw e } } catch (e) { tracing.recordException(e) @@ -571,6 +616,11 @@ const handleSyncFromPrimary = async ({ // for final log check the _secondarySyncFromPrimary function + if (errorResponse) { + return errorResponse + } + + // If unexpected errors occur, default to general failure response return { error: e, result: 'failure_sync_secondary_from_primary'