Skip to content

Commit

Permalink
[CON-457] add retry and exponential backoff to `ensurePrimaryMiddlwar…
Browse files Browse the repository at this point in the history
…e` (#4171)
  • Loading branch information
jonaylor89 committed Oct 26, 2022
1 parent f8e8339 commit 0b3c754
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 69 deletions.
132 changes: 64 additions & 68 deletions creator-node/src/services/ContentNodeInfoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import initAudiusLibs from './initAudiusLibs'
import { createChildLogger } from '../logging'
import defaultRedisClient from '../redis'
import { timeout } from '../utils'
import { asyncRetry } from '../utils/asyncRetry'
import config from '../config'

const SP_ID_TO_CHAIN_INFO_MAP_KEY = 'contentNodeInfoManagerSpIdMap'
Expand Down Expand Up @@ -168,45 +169,43 @@ export async function getReplicaSetSpIdsByUserId({
* If `blockNumber` provided, poll contract until it has indexed that blocknumber (for up to 200 seconds)
*/
if (blockNumber) {
// In total, will try for 200 seconds
const MAX_RETRIES = 201
const RETRY_TIMEOUT_MS = 1000 // 1 seconds
const MAX_RETRIES = 10

let errorMsg = null
for (let retry = 1; retry <= MAX_RETRIES; retry++) {
logger.info(
`retry #${retry}/${MAX_RETRIES} || time from start: ${
Date.now() - start
}. Polling until blockNumber ${blockNumber}.`
)

try {
// Sill throw error if blocknumber not found
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
})

if (!spResponse) continue
if (spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [spResponse.secondary1SpID, spResponse.secondary2SpID]
await asyncRetry({
options: {
retries: MAX_RETRIES
},
asyncFn: async () => {
try {
// Sill throw error if blocknumber not found
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId,
blockNumber
})

if (!spResponse) throw new Error('no spResponse')
if (spResponse.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
return
} else {
// The blocknumber was indexed by discovery, but there's still no user replica set returned
errorMsg = 'User replica not found in discovery'
return
}
errorMsg = null
break
} else {
// The blocknumber was indexed by discovery, but there's still no user replica set returned
errorMsg = 'User replica not found in discovery'
break
}
} catch (e: any) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded

await timeout(RETRY_TIMEOUT_MS)
}
} catch (e: any) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded
}
})

// Error if indexed blockNumber but didn't find any replicaSet for user
if (!replicaSet.primaryId) {
Expand All @@ -220,40 +219,37 @@ export async function getReplicaSetSpIdsByUserId({
* Error if still mismatched after specified timeout
*/

// Will poll every second for 60 sec
const MAX_RETRIES = 61
const RETRY_TIMEOUT_MS = 1000 // 1 sec

const MAX_RETRIES = 10
let errorMsg = null
for (let retry = 1; retry <= MAX_RETRIES; retry++) {
logger.info(
`retry #${retry}/${MAX_RETRIES} || time from start: ${
Date.now() - start
}. Polling until primaryEnsured.`
)

try {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse?.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [spResponse.secondary1SpID, spResponse.secondary2SpID]
await asyncRetry({
options: {
retries: MAX_RETRIES
},
asyncFn: async () => {
try {
const encodedUserId = libs.Utils.encodeHashId(userId)
const spResponse = await libs.discoveryProvider.getUserReplicaSet({
encodedUserId
})

if (spResponse?.primarySpID) {
replicaSet = {
primaryId: spResponse.primarySpID,
secondaryIds: [
spResponse.secondary1SpID,
spResponse.secondary2SpID
]
}
errorMsg = null
if (replicaSet.primaryId === selfSpId) return
} else {
errorMsg = 'User replica not found in discovery'
}
errorMsg = null
if (replicaSet.primaryId === selfSpId) break
} else {
errorMsg = 'User replica not found in discovery'
}
} catch (e: any) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded

await timeout(RETRY_TIMEOUT_MS)
}
} catch (e: any) {
errorMsg = e.message
} // Ignore all errors until MAX_RETRIES exceeded
}
})

// Error if failed to retrieve replicaSet
if (!replicaSet.primaryId) {
Expand Down
3 changes: 2 additions & 1 deletion creator-node/src/utils/asyncRetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ export function asyncRetry({
onRetry: (err, i) => {
if (err && log) {
const logPrefix =
(logLabel ? `[${logLabel}] ` : '') + `[asyncRetry] [attempt #${i}]`
(logLabel ? `[${logLabel}] ` : '') +
`[asyncRetry:${asyncFn.name}] [attempt #${i}]`
logger.warn(`${logPrefix}: `, err.message || err)
}
},
Expand Down

0 comments on commit 0b3c754

Please sign in to comment.