Skip to content

Commit

Permalink
Add transcode queue insights into health check (#1758)
Browse files Browse the repository at this point in the history
* Add transcode queue insights into health check

* Add verbose health check

* Fix test

* Update to length

* Only verbose health check

* Update content node service selection

* Update selection to use transcode queue size

* unused var

* Update comment

* Add checks for property existence and add default ratio

* Bug fixes

* Remove service selection changes

* More stuff

* Remove logs
  • Loading branch information
dmanjunath committed Aug 13, 2021
1 parent bad9491 commit 355bc0f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 7 deletions.
17 changes: 17 additions & 0 deletions creator-node/src/TranscodingQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class TranscodingQueue {
this.logStatus = this.logStatus.bind(this)
this.segment = this.segment.bind(this)
this.transcode320 = this.transcode320.bind(this)
this.getTranscodeQueueJobs = this.getTranscodeQueueJobs.bind(this)
}

/**
Expand Down Expand Up @@ -120,6 +121,22 @@ class TranscodingQueue {
const result = await job.finished()
return result
}

async getTranscodeQueueJobs () {
const queue = this.queue
const [
waiting,
active
] = await Promise.all([
queue.getJobs(['waiting']),
queue.getJobs(['active'])
])

return {
waiting: waiting.length,
active: active.length
}
}
}

module.exports = new TranscodingQueue()
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const healthCheck = async ({ libs } = {}, logger, sequelize, getMonitors, number
* @param {function} getAggregateSyncData fn to get the latest daily sync count (success, fail, triggered)
* @param {function} getLatestSyncData fn to get the timestamps of the most recent sync (success, fail)
*/
const healthCheckVerbose = async ({ libs, snapbackSM } = {}, logger, sequelize, getMonitors, numberOfCPUs, getAggregateSyncData, getLatestSyncData) => {
const healthCheckVerbose = async ({ libs, snapbackSM } = {}, logger, sequelize, getMonitors, numberOfCPUs, getTranscodeQueueJobs, getAggregateSyncData, getLatestSyncData) => {
const basicHealthCheck = await healthCheck({ libs }, logger, sequelize, getMonitors, numberOfCPUs)

// Location information
Expand Down Expand Up @@ -143,6 +143,8 @@ const healthCheckVerbose = async ({ libs, snapbackSM } = {}, logger, sequelize,
currentSnapbackReconfigMode = snapbackSM.highestEnabledReconfigMode
}

const { active: transcodeActive, waiting: transcodeWaiting } = await getTranscodeQueueJobs()

const response = {
...basicHealthCheck,
country,
Expand Down Expand Up @@ -171,7 +173,9 @@ const healthCheckVerbose = async ({ libs, snapbackSM } = {}, logger, sequelize,
currentSnapbackReconfigMode,
manualSyncsDisabled,
snapbackModuloBase,
snapbackJobInterval
snapbackJobInterval,
transcodeActive,
transcodeWaiting
}

return response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,15 @@ describe('Test Health Check Verbose', function () {
highestEnabledReconfigMode: 'RECONFIG_DISABLED'
}
}
const res = await healthCheckVerbose(serviceRegistryMock, mockLogger, sequelizeMock, getMonitorsMock, 2)
const TranscodingQueueMock = (active = 0, waiting = 0) => {
return {
getTranscodeQueueJobs: async () => {
return { active, waiting }
}
}
}

const res = await healthCheckVerbose(serviceRegistryMock, mockLogger, sequelizeMock, getMonitorsMock, 2, TranscodingQueueMock(4, 0).getTranscodeQueueJobs)

assert.deepStrictEqual(res, {
...version,
Expand Down Expand Up @@ -187,7 +195,9 @@ describe('Test Health Check Verbose', function () {
currentSnapbackReconfigMode: 'RECONFIG_DISABLED',
manualSyncsDisabled: false,
snapbackModuloBase: 18,
snapbackJobInterval: 1000
snapbackJobInterval: 1000,
transcodeActive: 4,
transcodeWaiting: 0
})
})
})
11 changes: 10 additions & 1 deletion creator-node/src/components/healthCheck/healthCheckController.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const { serviceRegistry } = require('../../serviceRegistry')
const { sequelize } = require('../../models')
const { getMonitors } = require('../../monitors/monitors')
const { getAggregateSyncData, getLatestSyncData } = require('../../snapbackSM/syncHistoryAggregator')
const TranscodingQueue = require('../../TranscodingQueue')

const { recoverWallet } = require('../../apiSigning')
const { handleTrackContentUpload, removeTrackFolder } = require('../../fileManager')
Expand Down Expand Up @@ -57,7 +58,14 @@ const healthCheckController = async (req) => {
let { randomBytesToSign } = req.query

const logger = req.logger
const response = await healthCheck(serviceRegistry, logger, sequelize, getMonitors, numberOfCPUs, randomBytesToSign)
const response = await healthCheck(
serviceRegistry,
logger,
sequelize,
getMonitors,
numberOfCPUs,
randomBytesToSign
)
return successResponse(response)
}

Expand Down Expand Up @@ -98,6 +106,7 @@ const healthCheckVerboseController = async (req) => {
sequelize,
getMonitors,
numberOfCPUs,
TranscodingQueue.getTranscodeQueueJobs,
getAggregateSyncData,
getLatestSyncData
)
Expand Down
4 changes: 3 additions & 1 deletion libs/src/services/creatorNode/CreatorNodeSelection.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ class CreatorNodeSelection extends ServiceSelection {
maxFileDescriptors: data.maxFileDescriptors,
allocatedFileDescriptors: data.allocatedFileDescriptors,
receivedBytesPerSec: data.receivedBytesPerSec,
transferredBytesPerSec: data.transferredBytesPerSec
transferredBytesPerSec: data.transferredBytesPerSec,
transcodeWaiting: data.transcodeWaiting,
transcodeActive: data.transcodeActive
})
} catch (e) {
// Swallow errors -- this method should not throw generally
Expand Down
4 changes: 3 additions & 1 deletion libs/src/utils/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ async function timeRequestsAndSortByVersion (requests, timeout = null) {
// If health check failed, send to back of timings
if (!a.response) return 1
if (!b.response) return -1

// Sort by highest version
if (semver.gt(a.response.data.data.version, b.response.data.data.version)) return -1
if (semver.lt(a.response.data.data.version, b.response.data.data.version)) return 1
// If same version, do a tie breaker on the response time

// If same version and transcode queue load, do a tie breaker on the response time
return a.millis - b.millis
})
}
Expand Down

0 comments on commit 355bc0f

Please sign in to comment.