Skip to content

Commit

Permalink
Gating entity manager replica set updates with identity service (#3965)
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacsolo committed Sep 30, 2022
1 parent aa66242 commit 13bd5ff
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 57 deletions.
Expand Up @@ -695,7 +695,26 @@ const _issueUpdateReplicaSetOp = async (
return response
}

if (config.get('entityManagerReplicaSetEnabled')) {
// First try updateReplicaSet via URSM
// Fallback to EntityManager when relay errors
try {
if (!config.get('entityManagerReplicaSetEnabled')) {
throw new Error(
'Fallback to URSM writes because EntityManager is disabled'
)
}
await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
} catch (err) {
logger.info(
`[_issueUpdateReplicaSetOp] updating replica set now ${
Date.now() - startTimeMs
Expand Down Expand Up @@ -729,18 +748,6 @@ const _issueUpdateReplicaSetOp = async (
`[_issueUpdateReplicaSetOp] waitForReplicaSetDiscovery Indexing Unable to confirm updated replica set for user ${userId}`
)
}
} else {
await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
}

response.issuedReconfig = true
Expand Down Expand Up @@ -844,17 +851,22 @@ const _canReconfig = async ({
let error
try {
let chainPrimarySpId, chainSecondarySpIds
if (config.get('entityManagerReplicaSetEnabled')) {
// Attempt to get replica set from DN when entity manager is enabled
// Fallback to URSM
try {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})
if (!spResponse) {
throw new Error('User replica set is not on discovery')
}
chainPrimarySpId = spResponse?.primarySpID
chainSecondarySpIds = [
spResponse?.secondary1SpID,
spResponse?.secondary2SpID
]
} else {
} catch (err) {
const response =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
Expand Down
6 changes: 6 additions & 0 deletions identity-service/src/config.js
Expand Up @@ -821,6 +821,12 @@ const config = convict({
format: Boolean,
env: 'skipAbuseCheck',
default: false
},
entityManagerReplicaSetEnabled: {
doc: 'Enable replica set updates with Entity Manager',
format: Boolean,
env: 'entityManagerReplicaSetEnabled',
default: false
}
})

Expand Down
11 changes: 11 additions & 0 deletions identity-service/src/routes/relay.js
Expand Up @@ -7,6 +7,8 @@ const { detectAbuse } = require('../utils/antiAbuse')
const { getFeatureFlag, FEATURE_FLAGS } = require('../featureFlag')
const models = require('../models')
const { getIP } = require('../utils/antiAbuse')
const { libs } = require('@audius/sdk')
const config = require('../config.js')

module.exports = function (app) {
// TODO(roneilr): authenticate that user controls senderAddress somehow, potentially validate that
Expand Down Expand Up @@ -66,6 +68,15 @@ module.exports = function (app) {
senderAddress: body.senderAddress,
gasLimit: body.gasLimit || null
}

// When EntityManager is enabled for replica sets, throw error for URSM
// Fallback to EntityManager
if (config.get('entityManagerReplicaSetEnabled') && txProps.contractRegistryKey === 'UserReplicaSetManager') {
const decodedABI = libs.AudiusABIDecoder.decodeMethod(txProps.contractRegistryKey, txProps.encodedABI)
if (decodedABI.name === 'updateReplicaSet') {
throw new Error('Cannot relay UserReplicaSetManager transactions when EntityManager is enabled')
}
}
receipt = await txRelay.sendTransaction(
req,
false, // resetNonce
Expand Down
75 changes: 33 additions & 42 deletions libs/src/api/Users.ts
Expand Up @@ -633,37 +633,16 @@ export class Users extends Base {
const oldMetadata = { ...user }

// Update user creator_node_endpoint on chain if applicable
let updateEndpointTxBlockNumber = null
const updateEndpointTxBlockNumber = null
if (
newMetadata.creator_node_endpoint !== oldMetadata.creator_node_endpoint
) {
// Perform update to new contract
startMs = Date.now()
const { txReceipt: updateEndpointTxReceipt, replicaSetSPIDs } =
await this._updateReplicaSetOnChain(
userId,
newMetadata.creator_node_endpoint,
useEntityManager
)
updateEndpointTxBlockNumber = updateEndpointTxReceipt?.blockNumber
console.log(
`${logPrefix} _updateReplicaSetOnChain() completed in ${
Date.now() - startMs
}ms`
await this._updateReplicaSetOnChain(
userId,
newMetadata.creator_node_endpoint
)
startMs = Date.now()
if (useEntityManager) {
await this.waitForReplicaSetDiscoveryIndexing(
userId,
replicaSetSPIDs,
updateEndpointTxBlockNumber
)
} else {
await this._waitForURSMCreatorNodeEndpointIndexing(
userId,
replicaSetSPIDs
)
}
console.log(
`${logPrefix} _waitForURSMCreatorNodeEndpointIndexing() completed in ${
Date.now() - startMs
Expand Down Expand Up @@ -831,8 +810,7 @@ export class Users extends Base {
const { txReceipt, replicaSetSPIDs } =
await this._updateReplicaSetOnChain(
userId,
newMetadata.creator_node_endpoint,
useEntityManager
newMetadata.creator_node_endpoint
)
console.log(
`${logPrefix} [phase: ${phase}] _updateReplicaSetOnChain() completed in ${
Expand Down Expand Up @@ -1238,11 +1216,7 @@ export class Users extends Base {
// Perform replica set update
// Conditionally write to UserFactory contract, else write to UserReplicaSetManager
// This behavior is to ensure backwards compatibility prior to contract deploy
async _updateReplicaSetOnChain(
userId: number,
creatorNodeEndpoint: string,
useEntityManager: boolean
) {
async _updateReplicaSetOnChain(userId: number, creatorNodeEndpoint: string) {
// Attempt to update through UserReplicaSetManagerClient if present
if (!this.contracts.UserReplicaSetManagerClient) {
await this.contracts.initUserReplicaSetManagerClient()
Expand All @@ -1266,8 +1240,25 @@ export class Users extends Base {
const currentUser = this.userStateManager.getCurrentUser()
if (!currentUser) throw new Error('Current user missing')

// Update in new contract
if (useEntityManager) {
// First try to update with URSM
// Fallback to EntityManager when relay errors
let updateEndpointTxBlockNumber
let replicaSetSPIDs
try {
txReceipt =
await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet(
userId,
primarySpID,
[secondary1SpID, secondary2SpID]
)
replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID]
updateEndpointTxBlockNumber = txReceipt?.blockNumber

await this._waitForURSMCreatorNodeEndpointIndexing(
userId,
replicaSetSPIDs
)
} catch {
const currentPrimaryEndpoint = CreatorNode.getPrimary(
currentUser.creator_node_endpoint
)
Expand Down Expand Up @@ -1295,18 +1286,18 @@ export class Users extends Base {
oldPrimary: oldPrimary,
oldSecondaries: [oldSecondary1SpID, oldSecondary2SpID]
})
} else {
txReceipt =
await this.contracts.UserReplicaSetManagerClient?.updateReplicaSet(
userId,
primarySpID,
[secondary1SpID, secondary2SpID]
)
replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID]
updateEndpointTxBlockNumber = txReceipt?.blockNumber

await this.waitForReplicaSetDiscoveryIndexing(
userId,
replicaSetSPIDs,
updateEndpointTxBlockNumber
)
}
if (!txReceipt) {
throw new Error('Unable to update replica set on chain')
}
const replicaSetSPIDs = [primarySpID, secondary1SpID, secondary2SpID]
return {
txReceipt,
replicaSetSPIDs
Expand Down

0 comments on commit 13bd5ff

Please sign in to comment.