diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts index ab8638bc557..d588ebe5cfc 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/updateReplicaSet.jobProcessor.ts @@ -695,7 +695,26 @@ const _issueUpdateReplicaSetOp = async ( return response } - if (config.get('entityManagerReplicaSetEnabled')) { + // First try updateReplicaSet via URSM + // Fallback to EntityManager when relay errors + try { + if (!config.get('entityManagerReplicaSetEnabled')) { + throw new Error( + 'Fallback to URSM writes because EntityManager is disabled' + ) + } + await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet( + userId, + newReplicaSetSPIds[0], // new primary + newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2] + // This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain + oldPrimarySpId || chainPrimarySpId, + [ + oldSecondary1SpId || chainSecondarySpIds?.[0], + oldSecondary2SpId || chainSecondarySpIds?.[1] + ] + ) + } catch (err) { logger.info( `[_issueUpdateReplicaSetOp] updating replica set now ${ Date.now() - startTimeMs @@ -729,18 +748,6 @@ const _issueUpdateReplicaSetOp = async ( `[_issueUpdateReplicaSetOp] waitForReplicaSetDiscovery Indexing Unable to confirm updated replica set for user ${userId}` ) } - } else { - await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet( - userId, - newReplicaSetSPIds[0], // new primary - newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2] - // This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain - oldPrimarySpId || chainPrimarySpId, - [ - oldSecondary1SpId || chainSecondarySpIds?.[0], - oldSecondary2SpId || chainSecondarySpIds?.[1] - ] - ) } response.issuedReconfig = true @@ -844,17 +851,22 @@ const _canReconfig = async ({ let error try { let chainPrimarySpId, chainSecondarySpIds - if (config.get('entityManagerReplicaSetEnabled')) { + // Attempt to get replica set from DN when entity manager is enabled + // Fallback to URSM + try { const encodedUserId = libs.Utils.encodeHashId(userId) const spResponse = await libs.discoveryProvider.getUserReplicaSet({ encodedUserId }) + if (!spResponse) { + throw new Error('User replica set is not on discovery') + } chainPrimarySpId = spResponse?.primarySpID chainSecondarySpIds = [ spResponse?.secondary1SpID, spResponse?.secondary2SpID ] - } else { + } catch (err) { const response = await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet( userId diff --git a/identity-service/src/config.js b/identity-service/src/config.js index 99bb5309803..7eee75ee36c 100644 --- a/identity-service/src/config.js +++ b/identity-service/src/config.js @@ -821,6 +821,12 @@ const config = convict({ format: Boolean, env: 'skipAbuseCheck', default: false + }, + entityManagerReplicaSetEnabled: { + doc: 'Enable replica set updates with Entity Manager', + format: Boolean, + env: 'entityManagerReplicaSetEnabled', + default: false } }) diff --git a/identity-service/src/routes/relay.js b/identity-service/src/routes/relay.js index 93fecb83a36..765f58cf143 100644 --- a/identity-service/src/routes/relay.js +++ b/identity-service/src/routes/relay.js @@ -7,6 +7,8 @@ const { detectAbuse } = require('../utils/antiAbuse') const { getFeatureFlag, FEATURE_FLAGS } = require('../featureFlag') const models = require('../models') const { getIP } = require('../utils/antiAbuse') +const { libs } = require('@audius/sdk') +const config = require('../config.js') module.exports = function (app) { // TODO(roneilr): authenticate that user controls senderAddress somehow, potentially validate that @@ -66,6 +68,15 @@ module.exports = function (app) { senderAddress: body.senderAddress, gasLimit: body.gasLimit || null } + + // When EntityManager is enabled for replica sets, throw error for URSM + // Fallback to EntityManager + if (config.get('entityManagerReplicaSetEnabled') && txProps.contractRegistryKey === 'UserReplicaSetManager') { + const decodedABI = libs.AudiusABIDecoder.decodeMethod(txProps.contractRegistryKey, txProps.encodedABI) + if (decodedABI.name === 'updateReplicaSet') { + throw new Error('Cannot relay UserReplicaSetManager transactions when EntityManager is enabled') + } + } receipt = await txRelay.sendTransaction( req, false, // resetNonce diff --git a/libs/src/api/Users.ts b/libs/src/api/Users.ts index 71b316ba1f6..ab55820088f 100644 --- a/libs/src/api/Users.ts +++ b/libs/src/api/Users.ts @@ -633,37 +633,16 @@ export class Users extends Base { const oldMetadata = { ...user } // Update user creator_node_endpoint on chain if applicable - let updateEndpointTxBlockNumber = null + const updateEndpointTxBlockNumber = null if ( newMetadata.creator_node_endpoint !== oldMetadata.creator_node_endpoint ) { // Perform update to new contract startMs = Date.now() - const { txReceipt: updateEndpointTxReceipt, replicaSetSPIDs } = - await this._updateReplicaSetOnChain( - userId, - newMetadata.creator_node_endpoint, - useEntityManager - ) - updateEndpointTxBlockNumber = updateEndpointTxReceipt?.blockNumber - console.log( - `${logPrefix} _updateReplicaSetOnChain() completed in ${ - Date.now() - startMs - }ms` + await this._updateReplicaSetOnChain( + userId, + newMetadata.creator_node_endpoint ) - startMs = Date.now() - if (useEntityManager) { - await this.waitForReplicaSetDiscoveryIndexing( - userId, - replicaSetSPIDs, - updateEndpointTxBlockNumber - ) - } else { - await this._waitForURSMCreatorNodeEndpointIndexing( - userId, - replicaSetSPIDs - ) - } console.log( `${logPrefix} _waitForURSMCreatorNodeEndpointIndexing() completed in ${ Date.now() - startMs @@ -831,8 +810,7 @@ export class Users extends Base { const { txReceipt, replicaSetSPIDs } = await this._updateReplicaSetOnChain( userId, - newMetadata.creator_node_endpoint, - useEntityManager + newMetadata.creator_node_endpoint ) console.log( `${logPrefix} [phase: ${phase}] _updateReplicaSetOnChain() completed in ${ @@ -1238,11 +1216,7 @@ export class Users extends Base { // Perform replica set update // Conditionally write to UserFactory contract, else write to UserReplicaSetManager // This behavior is to ensure backwards compatibility prior to contract deploy - async _updateReplicaSetOnChain( - userId: number, - creatorNodeEndpoint: string, - useEntityManager: boolean - ) { + async _updateReplicaSetOnChain(userId: number, creatorNodeEndpoint: string) { // Attempt to update through UserReplicaSetManagerClient if present if (!this.contracts.UserReplicaSetManagerClient) { await this.contracts.initUserReplicaSetManagerClient() @@ -1266,8 +1240,25 @@ export class Users extends Base { const currentUser = this.userStateManager.getCurrentUser() if (!currentUser) throw new Error('Current user missing') - // Update in new contract - if (useEntityManager) { + // First try to update with URSM + // Fallback to EntityManager when relay errors + let updateEndpointTxBlockNumber + let replicaSetSPIDs + try { + txReceipt = + await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet( + userId, + primarySpID, + [secondary1SpID, secondary2SpID] + ) + replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID] + updateEndpointTxBlockNumber = txReceipt?.blockNumber + + await this._waitForURSMCreatorNodeEndpointIndexing( + userId, + replicaSetSPIDs + ) + } catch { const currentPrimaryEndpoint = CreatorNode.getPrimary( currentUser.creator_node_endpoint ) @@ -1295,18 +1286,18 @@ export class Users extends Base { oldPrimary: oldPrimary, oldSecondaries: [oldSecondary1SpID, oldSecondary2SpID] }) - } else { - txReceipt = - await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet( - userId, - primarySpID, - [secondary1SpID, secondary2SpID] - ) + replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID] + updateEndpointTxBlockNumber = txReceipt?.blockNumber + + await this.waitForReplicaSetDiscoveryIndexing( + userId, + replicaSetSPIDs, + updateEndpointTxBlockNumber + ) } if (!txReceipt) { throw new Error('Unable to update replica set on chain') } - const replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID] return { txReceipt, replicaSetSPIDs