Skip to content

Commit

Permalink
user and replica set entity manager (#3897)
Browse files Browse the repository at this point in the history
* add user entity manager

* Update discovery user replica route and entity manager

* WIP CN check discovery vs chain

* Update libs bindings for entity manager

* lint discovery

* Fix lint

* Turn on dev features

* removediscovery logs

* Add wait for discovery indexing to cn replica set update

* Fix libs type

* Fix lint

* fix api unit test

* Update ns docs for api change

* Update gen types

* FIx cn tests

* Update sdk api

* Update user replica endpoint to return empty when not found

* Address comments on cn middleware ensure primary

* Update libs to return err on discovery blocknumber makereq

* Put content entity manager and discovery validation behind env var

* Fix discovery lint

* Update creator node

* Fix discovery indexing

* fix libs

* lint fix discovery

* add more entity manager flags and fix bugs w updates

* update entityManagerAddress in CN config and add handle_lc

* fix types

* Update local cn entity manager addr

* Add timeout to get user replica set

* Update local env vs prod config for entity manager

* Address comments

* Fix merge err

* Fix libs bug

* fix libs lint

* add comment on wait for repica set indexing

* lint fix discovery after merge

* Fix metadata change test

Co-authored-by: Ubuntu <ubuntu@joe.us-central1-a.c.dev-audius-infra.internal>
Co-authored-by: isaac <isaac@audius.co>
  • Loading branch information
3 people committed Sep 28, 2022
1 parent 0e1d8bc commit e81f849
Show file tree
Hide file tree
Showing 44 changed files with 2,666 additions and 703 deletions.
3 changes: 3 additions & 0 deletions creator-node/compose/env/base.env
Expand Up @@ -74,3 +74,6 @@ discoveryNodeUnhealthyBlockDiff=500
maxBatchClockStatusBatchSize=5

reconfigSPIdBlacklistString=

entityManagerAddress=0x5b9b42d6e4B2e4Bf8d42Eba32D46918e10899B66
entityManagerReplicaSetEnabled=true
12 changes: 12 additions & 0 deletions creator-node/src/config.js
Expand Up @@ -392,6 +392,12 @@ const config = convict({
env: 'dataRegistryAddress',
default: null
},
entityManagerAddress: {
doc: 'entity manager registry address',
format: String,
env: 'entityManagerAddress',
default: '0x2F99338637F027CFB7494E46B49987457beCC6E3'
},
dataProviderUrl: {
doc: 'data contracts web3 provider url',
format: String,
Expand Down Expand Up @@ -470,6 +476,12 @@ const config = convict({
env: 'considerNodeUnhealthy',
default: false
},
entityManagerReplicaSetEnabled: {
doc: 'whether or not to use entity manager to update the replica set',
format: Boolean,
env: 'entityManagerReplicaSetEnabled',
default: false
},

/** sync / snapback configs */

Expand Down
108 changes: 78 additions & 30 deletions creator-node/src/middlewares.js
Expand Up @@ -752,7 +752,6 @@ async function getReplicaSetSpIDs({
const RETRY_TIMEOUT_MS = 1000 // 1 seconds

let errorMsg = null
let blockNumberIndexed = false
for (let retry = 1; retry <= MAX_RETRIES; retry++) {
logger.info(
`${logPrefix} retry #${retry}/${MAX_RETRIES} || time from start: ${
Expand All @@ -761,15 +760,40 @@ async function getReplicaSetSpIDs({
)

try {
// will throw error if blocknumber not found
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber(
userId,
if (config.get('entityManagerReplicaSetEnabled')) {
// will throw error if blocknumber not found
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
)
errorMsg = null
blockNumberIndexed = true
break
})

if (spResponse) {
if (spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
break
} else {
// The blocknumber was indexed by discovery, but there's still no user replica set returned
errorMsg = `User replica not found in discovery`
break
}
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSetAtBlockNumber(
userId,
blockNumber
)
errorMsg = null
break
}
} catch (e) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded
Expand All @@ -779,22 +803,14 @@ async function getReplicaSetSpIDs({

// Error if indexed blockNumber but didn't find any replicaSet for user
if (
blockNumberIndexed &&
(!replicaSet ||
!replicaSet.hasOwnProperty('primaryId') ||
!replicaSet.primaryId)
!replicaSet ||
!replicaSet.hasOwnProperty('primaryId') ||
!replicaSet.primaryId
) {
throw new Error(
`${logPrefix} ERROR || Failed to retrieve user from UserReplicaSetManager after ${MAX_RETRIES} retries. Aborting.`
)
}

// Error if failed to index target blockNumber
if (!blockNumberIndexed) {
throw new Error(
`${logPrefix} ERROR || Web3 provider failed to index target blockNumber ${blockNumber} after ${MAX_RETRIES} retries. Aborting. Error ${errorMsg}`
)
}
} else if (ensurePrimary && selfSpID) {
/**
* If ensurePrimary required but no blockNumber provided, poll URSM until returned primary = selfSpID
Expand All @@ -814,12 +830,30 @@ async function getReplicaSetSpIDs({
)

try {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)

errorMsg = null
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse && spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
} else {
errorMsg = `User replica not found in discovery`
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
}

if (
replicaSet &&
Expand Down Expand Up @@ -863,10 +897,24 @@ async function getReplicaSetSpIDs({

let errorMsg = null
try {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse && spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [spResponse.secondary1SpID, spResponse.secondary2SpID]
}
}
} else {
replicaSet =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
}
} catch (e) {
errorMsg = e.message
}
Expand Down
35 changes: 26 additions & 9 deletions creator-node/src/routes/audiusUsers.js
@@ -1,6 +1,7 @@
const express = require('express')
const fs = require('fs-extra')

const config = require('../config')
const models = require('../models')
const { saveFileFromBufferToDisk } = require('../fileManager')
const {
Expand Down Expand Up @@ -113,16 +114,32 @@ router.post(
// Verify that wallet of the user on the blockchain for the given ID matches the user attempting to update
const serviceRegistry = req.app.get('serviceRegistry')
const { libs } = serviceRegistry
const userResp = await libs.contracts.UserFactoryClient.getUser(
blockchainUserId
)
if (
!userResp?.wallet ||
userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(blockchainUserId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
})
if (
(spResponse?.wallet ?? '').toLowerCase() !==
req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${spResponse?.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
)
}
} else {
const userResp = await libs.contracts.UserFactoryClient.getUser(
blockchainUserId
)
if (
!userResp?.wallet ||
userResp.wallet.toLowerCase() !== req.session.wallet.toLowerCase()
) {
throw new Error(
`Owner wallet ${userResp.wallet} of blockchainUserId ${blockchainUserId} does not match the wallet of the user attempting to write this data: ${req.session.wallet}`
)
}
}

const cnodeUserUUID = req.session.cnodeUserUUID
Expand Down
4 changes: 3 additions & 1 deletion creator-node/src/services/initAudiusLibs.js
Expand Up @@ -31,6 +31,7 @@ module.exports = async ({
const ethRegistryAddress = config.get('ethRegistryAddress')
const ethOwnerWallet = config.get('ethOwnerWallet')
const dataRegistryAddress = config.get('dataRegistryAddress')
const entityManagerAddress = config.get('entityManagerAddress')
const dataProviderUrl = config.get('dataProviderUrl')
const delegatePrivateKey = config.get('delegatePrivateKey')
const oldDelegatePrivateKey = config.get('oldDelegatePrivateKey')
Expand Down Expand Up @@ -73,7 +74,8 @@ module.exports = async ({
// pass as array
[dataProviderUrl],
// TODO - formatting this private key here is not ideal
(oldDelegatePrivateKey || delegatePrivateKey).replace('0x', '')
(oldDelegatePrivateKey || delegatePrivateKey).replace('0x', ''),
entityManagerAddress
)
: null,
discoveryProviderConfig: enableDiscovery
Expand Down
Expand Up @@ -97,7 +97,7 @@ const updateReplicaSetJobProcessor = async function ({
audiusLibs = await initAudiusLibs({
enableEthContracts: true,
enableContracts: true,
enableDiscovery: false,
enableDiscovery: true,
enableIdentity: true,
logger
})
Expand Down Expand Up @@ -695,17 +695,53 @@ const _issueUpdateReplicaSetOp = async (
return response
}

await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
if (config.get('entityManagerReplicaSetEnabled')) {
logger.info(
`[_issueUpdateReplicaSetOp] updating replica set now ${
Date.now() - startTimeMs
}ms for userId=${userId} wallet=${wallet}`
)

const { blockNumber } =
await audiusLibs.User.updateEntityManagerReplicaSet({
userId,
primary: newReplicaSetSPIds[0], // new primary
secondaries: newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimary: oldPrimarySpId || chainPrimarySpId,
oldSecondaries: [
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
})
logger.info(
`[_issueUpdateReplicaSetOp] did call audiusLibs.User.updateEntityManagerReplicaSet waiting for ${blockNumber}`
)
// Wait for blockhash/blockNumber to be indexed
try {
await audiusLibs.User.waitForReplicaSetDiscoveryIndexing(
userId,
newReplicaSetSPIds,
blockNumber
)
} catch (err) {
throw new Error(
`[_issueUpdateReplicaSetOp] waitForReplicaSetDiscovery Indexing Unable to confirm updated replica set for user ${userId}`
)
}
} else {
await audiusLibs.contracts.UserReplicaSetManagerClient._updateReplicaSet(
userId,
newReplicaSetSPIds[0], // new primary
newReplicaSetSPIds.slice(1), // [new secondary1, new secondary2]
// This defaulting logic is for the edge case when an SP deregistered and can't be fetched from our mapping, so we use the SP ID from the user's old replica set queried from the chain
oldPrimarySpId || chainPrimarySpId,
[
oldSecondary1SpId || chainSecondarySpIds?.[0],
oldSecondary2SpId || chainSecondarySpIds?.[1]
]
)
}

response.issuedReconfig = true
logger.info(
Expand Down Expand Up @@ -807,8 +843,25 @@ const _canReconfig = async ({
}: CanReconfigParams): Promise<CanReconfigReturnValue> => {
let error
try {
const { primaryId: chainPrimarySpId, secondaryIds: chainSecondarySpIds } =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(userId)
let chainPrimarySpId, chainSecondarySpIds
if (config.get('entityManagerReplicaSetEnabled')) {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})
chainPrimarySpId = spResponse?.primarySpID
chainSecondarySpIds = [
spResponse?.secondary1SpID,
spResponse?.secondary2SpID
]
} else {
const response =
await libs.contracts.UserReplicaSetManagerClient.getUserReplicaSet(
userId
)
chainPrimarySpId = response.primaryId
chainSecondarySpIds = response.secondaryIds
}

if (
!chainPrimarySpId ||
Expand Down
26 changes: 25 additions & 1 deletion creator-node/test/lib/libsMock.js
@@ -1,4 +1,5 @@
const sinon = require('sinon')
const { encode, decode } = require('../../src/hashids')

function getLibsMock() {
const libsMock = {
Expand Down Expand Up @@ -76,14 +77,37 @@ function getLibsMock() {
Playlist: {
getPlaylists: sinon.mock().atLeast(1)
},
Utils: {
encodeHashId: sinon.mock().atLeast(1)
},
discoveryProvider: {
discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000'
discoveryProviderEndpoint: 'http://docker.for.mac.localhost:5000',
getUserReplicaSet: sinon.mock().atLeast(1)
},
web3Manager: {
verifySignature: () => '0x7c95A677106218A296EcEF1F577c3aE27f0340cd'
}
}

libsMock.Utils.encodeHashId.callsFake((id) => {
return encode(id)
})

libsMock.discoveryProvider.getUserReplicaSet.callsFake(({ encodedUserId }) => {
const user_id = decode(encodedUserId)
return {
user_id,
"wallet": '0xadd36bad12002f1097cdb7ee24085c28e960fc32',
"primary": 'http://mock-cn1.audius.co',
"secondary1": 'http://mock-cn2.audius.co',
"secondary2": 'http://mock-cn3.audius.co',
"primarySpID": 1,
"secondary1SpID": 2,
"secondary2SpID": 3
}
})


libsMock.contracts.UserReplicaSetManagerClient.getUserReplicaSet.returns({
primaryId: 1,
secondaryIds: [2, 3]
Expand Down

0 comments on commit e81f849

Please sign in to comment.