diff --git a/creator-node/src/services/stateMachineManager/stateReconciliation/recoverOrphanedData.jobProcessor.ts b/creator-node/src/services/stateMachineManager/stateReconciliation/recoverOrphanedData.jobProcessor.ts index 2bf5a3e3c34..51584a049b7 100644 --- a/creator-node/src/services/stateMachineManager/stateReconciliation/recoverOrphanedData.jobProcessor.ts +++ b/creator-node/src/services/stateMachineManager/stateReconciliation/recoverOrphanedData.jobProcessor.ts @@ -236,7 +236,7 @@ const _batchIssueReqsToRecoverOrphanedData = async ( i < numWalletsWithOrphanedData; i += ORPHANED_DATA_NUM_USERS_TO_RECOVER_PER_BATCH ) { - const walletsWithOrphanedData = await redisClient.spop( + const walletsWithOrphanedData = await redisClient.srandmember( WALLETS_ORPHANED_KEY, ORPHANED_DATA_NUM_USERS_TO_RECOVER_PER_BATCH ) @@ -262,6 +262,7 @@ const _batchIssueReqsToRecoverOrphanedData = async ( } }) requestsIssued++ + await redisClient.srem(WALLETS_ORPHANED_KEY, wallet) } catch (e: any) { logger.error( `Error issuing request to recover orphaned data: ${e.message}` @@ -271,9 +272,7 @@ const _batchIssueReqsToRecoverOrphanedData = async ( const elapsedMs = Date.now() - start logger.info( - `Issued /merge_primary_and_secondary requests for ${ - i + numWalletsWithOrphanedData - }/${numWalletsWithOrphanedData} wallets. + `Issued /merge_primary_and_secondary requests for ${requestsIssued}/${numWalletsWithOrphanedData} wallets. Time elapsed: ${elapsedMs}/${MAX_MS_TO_ISSUE_RECOVER_ORPHANED_DATA_REQUESTS}` ) if (elapsedMs >= MAX_MS_TO_ISSUE_RECOVER_ORPHANED_DATA_REQUESTS) { diff --git a/creator-node/test/recoverOrphanedData.jobProcessor.test.ts b/creator-node/test/recoverOrphanedData.jobProcessor.test.ts index dcdb26f22c2..87dec398b86 100644 --- a/creator-node/test/recoverOrphanedData.jobProcessor.test.ts +++ b/creator-node/test/recoverOrphanedData.jobProcessor.test.ts @@ -107,7 +107,7 @@ describe('test recoverOrphanedData job processor', function () { } } - orphanedUsers.forEach((orphanedUser) => { + for (const orphanedUser of orphanedUsers) { // Mock fetching the primary endpoint for each orphaned user nock(DISCOVERY_NODE_ENDPOINT) .get('/users') @@ -129,7 +129,7 @@ describe('test recoverOrphanedData job processor', function () { forceWipe: true }) .reply(200) - }) + } return proxyquire( '../src/services/stateMachineManager/stateReconciliation/recoverOrphanedData.jobProcessor.ts',