Skip to content

Commit

Permalink
[ASI-976] [REPLICA] Fix StateMachineQueue unreliability + expose stat…
Browse files Browse the repository at this point in the history
…eMachineQueue status via health check (#2861)
  • Loading branch information
SidSethi committed Apr 6, 2022
1 parent 8799cc6 commit 0605556
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 177 deletions.
2 changes: 1 addition & 1 deletion creator-node/compose/env/base.env
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ spOwnerWalletIndex=


# Sync / SnapbackSM configs
snapbackJobInterval=10000 # ms
snapbackJobInterval=10000 # 10000ms = 10sec
snapbackModuloBase=3
minimumDailySyncCount=5
minimumRollingSyncCount=10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ const healthCheck = async (
dailySyncSuccessCount,
dailySyncFailCount,
latestSyncSuccessTimestamp,
latestSyncFailTimestamp
latestSyncFailTimestamp,
stateMachineQueueLatestJobSuccess,
stateMachineQueueLatestJobStart
] = await getMonitors([
MONITORS.DATABASE_CONNECTIONS,
MONITORS.DATABASE_SIZE,
Expand All @@ -81,7 +83,9 @@ const healthCheck = async (
MONITORS.DAILY_SYNC_SUCCESS_COUNT,
MONITORS.DAILY_SYNC_FAIL_COUNT,
MONITORS.LATEST_SYNC_SUCCESS_TIMESTAMP,
MONITORS.LATEST_SYNC_FAIL_TIMESTAMP
MONITORS.LATEST_SYNC_FAIL_TIMESTAMP,
MONITORS.LATEST_STATE_MACHINE_QUEUE_SUCCESS,
MONITORS.LATEST_STATE_MACHINE_QUEUE_START
])

let currentSnapbackReconfigMode
Expand Down Expand Up @@ -134,7 +138,13 @@ const healthCheck = async (
transcodeActive,
transcodeWaiting,
fileProcessingActive,
fileProcessingWaiting
fileProcessingWaiting,
stateMachineQueueLatestJobSuccess: stateMachineQueueLatestJobSuccess
? new Date(parseInt(stateMachineQueueLatestJobSuccess)).toISOString()
: null,
stateMachineQueueLatestJobStart: stateMachineQueueLatestJobStart
? new Date(parseInt(stateMachineQueueLatestJobStart)).toISOString()
: null
}

// If optional `randomBytesToSign` query param provided, node will include string in signed object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ describe('Test Health Check', function () {
transcodeActive: 4,
transcodeWaiting: 0,
fileProcessingActive: 0,
fileProcessingWaiting: 2
fileProcessingWaiting: 2,
stateMachineQueueLatestJobSuccess: null,
stateMachineQueueLatestJobStart: null
})
})

Expand Down Expand Up @@ -218,7 +220,9 @@ describe('Test Health Check', function () {
transcodeActive: 4,
transcodeWaiting: 0,
fileProcessingActive: 0,
fileProcessingWaiting: 2
fileProcessingWaiting: 2,
stateMachineQueueLatestJobSuccess: null,
stateMachineQueueLatestJobStart: null
})
})

Expand Down Expand Up @@ -274,7 +278,9 @@ describe('Test Health Check', function () {
transcodeActive: 4,
transcodeWaiting: 0,
fileProcessingActive: 0,
fileProcessingWaiting: 2
fileProcessingWaiting: 2,
stateMachineQueueLatestJobSuccess: null,
stateMachineQueueLatestJobStart: null
})

assert.deepStrictEqual(res.meetsMinRequirements, false)
Expand Down Expand Up @@ -343,7 +349,9 @@ describe('Test Health Check Verbose', function () {
transcodeActive: 4,
transcodeWaiting: 0,
fileProcessingActive: 0,
fileProcessingWaiting: 2
fileProcessingWaiting: 2,
stateMachineQueueLatestJobSuccess: null,
stateMachineQueueLatestJobStart: null
})
})

Expand Down
15 changes: 14 additions & 1 deletion creator-node/src/components/healthCheck/healthCheckController.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ const healthCheckController = async (req) => {
return errorResponseServerError()
}

const { randomBytesToSign } = req.query
const { randomBytesToSign, enforceStateMachineQueueHealth } = req.query

const AsyncProcessingQueue = serviceRegistry.asyncProcessingQueue

Expand All @@ -104,6 +104,19 @@ const healthCheckController = async (req) => {
numberOfCPUs,
randomBytesToSign
)

const { stateMachineQueueLatestJobSuccess } = response
if (enforceStateMachineQueueHealth && stateMachineQueueLatestJobSuccess) {
const healthyThresholdMs = 5 * config.get('snapbackJobInterval')

const delta = Date.now() - stateMachineQueueLatestJobSuccess.getTime()
if (delta > healthyThresholdMs) {
return errorResponseServerError(
`StateMachineQueue not healthy - last successful run ${delta}ms ago not within healthy threshold of ${healthyThresholdMs}ms`
)
}
}

return successResponse(response)
}

Expand Down
6 changes: 3 additions & 3 deletions creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ const config = convict({
doc: 'The modulo base to segment users by on snapback. Will process `1/snapbackModuloBase` users at some snapback interval',
format: 'nat',
env: 'snapbackModuloBase',
default: 24
default: 48
},
snapbackJobInterval: {
doc: 'Interval [ms] that snapbackSM jobs are fired; 1 hour',
doc: 'Interval [ms] that snapbackSM jobs are fired',
format: 'nat',
env: 'snapbackJobInterval',
default: 3600000
default: 1800000 // 30min
},
maxManualRequestSyncJobConcurrency: {
doc: 'Max bull queue concurrency for manual sync request jobs',
Expand Down
24 changes: 21 additions & 3 deletions creator-node/src/monitors/monitors.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ const {
getDailySyncSuccessCount,
getDailySyncFailCount,
getLatestSyncSuccessTimestamp,
getLatestSyncFailTimestamp
} = require('./syncHistory')
getLatestSyncFailTimestamp,
getStateMachineQueueLatestJobSuccess,
getStateMachineQueueLatestJobStart
} = require('./stateMachine')
const redis = require('../redis')

// Prefix used to key each monitored value in redis
Expand Down Expand Up @@ -218,6 +220,20 @@ const LATEST_SYNC_FAIL_TIMESTAMP = {
type: 'string'
}

const LATEST_STATE_MACHINE_QUEUE_SUCCESS = {
name: 'stateMachineQueueLatestJobSuccess',
func: getStateMachineQueueLatestJobSuccess,
ttl: 5, // 5 /* mins */ * 60 /* s */,
type: 'string'
}

const LATEST_STATE_MACHINE_QUEUE_START = {
name: 'stateMachineQueueLatestJobStart',
func: getStateMachineQueueLatestJobStart,
ttl: 5, // 5 /* mins */ * 60 /* s */,
type: 'string'
}

const MONITORS = {
DATABASE_LIVENESS,
DATABASE_SIZE,
Expand All @@ -243,7 +259,9 @@ const MONITORS = {
DAILY_SYNC_SUCCESS_COUNT,
DAILY_SYNC_FAIL_COUNT,
LATEST_SYNC_SUCCESS_TIMESTAMP,
LATEST_SYNC_FAIL_TIMESTAMP
LATEST_SYNC_FAIL_TIMESTAMP,
LATEST_STATE_MACHINE_QUEUE_SUCCESS,
LATEST_STATE_MACHINE_QUEUE_START
}

const getMonitorRedisKey = (monitor) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const redis = require('../redis')

const SyncHistoryAggregator = require('../snapbackSM/syncHistoryAggregator')
const { SYNC_STATES } = require('../snapbackSM/syncHistoryAggregator')

Expand Down Expand Up @@ -65,11 +67,21 @@ const getLatestSyncFailTimestamp = async () => {
return fail
}

const getStateMachineQueueLatestJobSuccess = async () => {
return redis.get('stateMachineQueueLatestJobSuccess')
}

const getStateMachineQueueLatestJobStart = async () => {
return redis.get('stateMachineQueueLatestJobStart')
}

module.exports = {
get30DayRollingSyncSuccessCount,
get30DayRollingSyncFailCount,
getDailySyncSuccessCount,
getDailySyncFailCount,
getLatestSyncSuccessTimestamp,
getLatestSyncFailTimestamp
getLatestSyncFailTimestamp,
getStateMachineQueueLatestJobSuccess,
getStateMachineQueueLatestJobStart
}
6 changes: 6 additions & 0 deletions creator-node/src/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ class RedisLock {
return redisClient.get(key)
}

static async acquireLock(key, expiration = EXPIRATION) {
console.log(`SETTING LOCK IF NOT EXISTS ${key}`)
const response = await redisClient.set(key, true, 'NX', 'EX', expiration)
return !!response
}

static async removeLock(key) {
console.log(`DELETING LOCK ${key}`)
return redisClient.del(key)
Expand Down
13 changes: 8 additions & 5 deletions creator-node/src/snapbackSM/peerSetManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ const axios = require('axios')
const config = require('../config')
const { logger } = require('../logging')

// Used in determining peer health
const PEER_HEALTH_CHECK_REQUEST_TIMEOUT = config.get(
const PEER_HEALTH_CHECK_REQUEST_TIMEOUT_MS = config.get(
'peerHealthCheckRequestTimeout'
)
const MINIMUM_STORAGE_PATH_SIZE = config.get('minimumStoragePathSize')
Expand All @@ -21,6 +20,8 @@ const MAX_NUMBER_SECONDS_PRIMARY_REMAINS_UNHEALTHY = config.get(
'maxNumberSecondsPrimaryRemainsUnhealthy'
)

const DEFAULT_AXIOS_TIMEOUT_MS = 5000 // 5s

class PeerSetManager {
constructor({
discoveryProviderEndpoint,
Expand Down Expand Up @@ -183,7 +184,8 @@ class PeerSetManager {
url: `v1/full/users/content_node/all`,
params: {
creator_node_endpoint: this.creatorNodeEndpoint
}
},
timeout: DEFAULT_AXIOS_TIMEOUT_MS
}

// Will throw error on non-200 response
Expand Down Expand Up @@ -217,7 +219,8 @@ class PeerSetManager {
url: `users/creator_node`,
params: {
creator_node_endpoint: this.creatorNodeEndpoint
}
},
timeout: DEFAULT_AXIOS_TIMEOUT_MS
}

// Will throw error on non-200 response
Expand Down Expand Up @@ -265,7 +268,7 @@ class PeerSetManager {
baseURL: endpoint,
url: '/health_check/verbose',
method: 'get',
timeout: PEER_HEALTH_CHECK_REQUEST_TIMEOUT
timeout: PEER_HEALTH_CHECK_REQUEST_TIMEOUT_MS
})

return resp.data.data
Expand Down

0 comments on commit 0605556

Please sign in to comment.