Skip to content

Commit

Permalink
[CON-437] Add env var to disable cluster mode (#3990)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored and SidSethi committed Oct 7, 2022
1 parent 27d477f commit 399c068
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 75 deletions.
34 changes: 17 additions & 17 deletions creator-node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion creator-node/src/config.js
Expand Up @@ -265,11 +265,17 @@ const config = convict({
default: false
},
expressAppConcurrency: {
doc: 'Number of processes to spawn, where each process runs its own Content Node. Default 0 to run one process per core (auto-detected)',
doc: 'Number of processes to spawn, where each process runs its own Content Node. Default 0 to run one process per core (auto-detected). Note that clusterModeEnabled must also be true for this to take effect',
format: 'nat',
env: 'expressAppConcurrency',
default: 0
},
clusterModeEnabled: {
doc: 'Whether or not cluster logic should be enabled (running multiple instances of the app to better utuilize multiple logical cores)',
format: Boolean,
env: 'clusterModeEnabled',
default: true
},

// Transcoding settings
transcodingMaxConcurrency: {
Expand Down
115 changes: 65 additions & 50 deletions creator-node/src/index.ts
Expand Up @@ -85,26 +85,26 @@ const verifyConfigAndDb = async () => {
}
}

/**
* Setting a different port is necessary for OpenResty to work. If OpenResty
* is enabled, have the app run on port 3000. Else, run on its configured port.
* @returns the port number to configure the Content Node app
*/
const getPort = () => {
const contentCacheLayerEnabled = config.get('contentCacheLayerEnabled')

if (contentCacheLayerEnabled) {
return 3000
}

return config.get('port')
}

// The primary process performs one-time validation and spawns worker processes that each run the Express app
const startAppForPrimary = async () => {
logger.info(`Primary process with pid=${process.pid} is running`)

await verifyConfigAndDb()
await clearRunningQueries()
try {
logger.info('Executing database migrations...')
await runMigrations()
logger.info('Migrations completed successfully')
} catch (migrationError) {
exitWithError('Error in migrations:', migrationError)
}

// Clear all redis locks
try {
await redisClient.WalletWriteLock.clearWriteLocks()
} catch (e: any) {
logger.warn(`Could not clear write locks. Skipping..: ${e.message}`)
}
await setupDbAndRedis()

const numWorkers = clusterUtils.getNumWorkers()
logger.info(`Spawning ${numWorkers} processes to run the Express app...`)
Expand Down Expand Up @@ -161,27 +161,58 @@ const startAppForPrimary = async () => {

// Workers don't share memory, so each one is its own Express instance with its own version of objects like serviceRegistry
const startAppForWorker = async () => {
/**
* Setting a different port is necessary for OpenResty to work. If OpenResty
* is enabled, have the app run on port 3000. Else, run on its configured port.
* @returns the port number to configure the Content Node app
*/
const getPort = () => {
const contentCacheLayerEnabled = config.get('contentCacheLayerEnabled')

if (contentCacheLayerEnabled) {
return 3000
logger.info(
`Worker process with pid=${process.pid} and worker ID=${cluster.worker?.id} is running`
)
await verifyConfigAndDb()
await startApp()

cluster.worker!.on('message', (msg) => {
if (msg?.cmd === 'setSpecialWorkerId') {
clusterUtils.specialWorkerId = msg?.val
} else if (msg?.cmd === 'receiveAggregatePrometheusMetrics') {
try {
const { prometheusRegistry } = serviceRegistry
prometheusRegistry.resolvePromiseToGetAggregatedMetrics(msg?.val)
} catch (error: any) {
logger.error(
`Failed to send aggregated metrics data back to worker: ${error}`
)
}
}
})

return config.get('port')
if (clusterUtils.isThisWorkerInit() && process.send) {
process.send({ cmd: 'initComplete' })
}
}

logger.info(
`Worker process with pid=${process.pid} and worker ID=${cluster.worker?.id} is running`
)
const startAppWithoutCluster = async () => {
logger.info(`Starting app with cluster mode disabled`)
await setupDbAndRedis()
await startApp()
}

const setupDbAndRedis = async () => {
await verifyConfigAndDb()
await clearRunningQueries()
try {
logger.info('Executing database migrations...')
await runMigrations()
logger.info('Migrations completed successfully')
} catch (migrationError) {
exitWithError('Error in migrations:', migrationError)
}

// Clear all redis locks
try {
await redisClient.WalletWriteLock.clearWriteLocks()
} catch (e: any) {
logger.warn(`Could not clear write locks. Skipping..: ${e.message}`)
}
}

const startApp = async () => {
// When app terminates, close down any open DB connections gracefully
ON_DEATH((signal: any, error: any) => {
// NOTE: log messages emitted here may be swallowed up if using the bunyan CLI (used by
Expand All @@ -201,28 +232,12 @@ const startAppForWorker = async () => {
const appInfo = initializeApp(getPort(), serviceRegistry)
logger.info('Initialized app and server')
await serviceRegistry.initServicesThatRequireServer(appInfo.app)

cluster.worker!.on('message', (msg) => {
if (msg?.cmd === 'setSpecialWorkerId') {
clusterUtils.specialWorkerId = msg?.val
} else if (msg?.cmd === 'receiveAggregatePrometheusMetrics') {
try {
const { prometheusRegistry } = serviceRegistry
prometheusRegistry.resolvePromiseToGetAggregatedMetrics(msg?.val)
} catch (error: any) {
logger.error(
`Failed to send aggregated metrics data back to worker: ${error}`
)
}
}
})

if (clusterUtils.isThisWorkerInit() && process.send) {
process.send({ cmd: 'initComplete' })
}
}

if (cluster.isMaster) {
if (!clusterUtils.isClusterEnabled()) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
startAppWithoutCluster()
} else if (cluster.isMaster) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
startAppForPrimary()
} else if (cluster.isWorker) {
Expand Down
18 changes: 13 additions & 5 deletions creator-node/src/routes/prometheusMetricsRoutes.js
@@ -1,24 +1,32 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
const express = require('express')
const { clusterUtils } = require('../utils')

const router = express.Router()

/**
* Exposes Prometheus metrics for the worker (not aggregated) at `GET /prometheus_metrics_worker`
*/

router.get('/prometheus_metrics_worker', async (req, res) => {
const returnMetricsForSingleProcess = async (req, res) => {
const prometheusRegistry = req.app.get('serviceRegistry').prometheusRegistry
const metricData = await prometheusRegistry.getAllMetricData()

res.setHeader('Content-Type', prometheusRegistry.registry.contentType)
return res.end(metricData)
}

/**
* Exposes Prometheus metrics for the worker (not aggregated) at `GET /prometheus_metrics_worker`
*/
router.get('/prometheus_metrics_worker', async (req, res) => {
return returnMetricsForSingleProcess(req, res)
})

/**
* Exposes Prometheus metrics aggregated across all workers at `GET /prometheus_metrics`
*/
router.get('/prometheus_metrics', async (req, res) => {
if (!clusterUtils.isClusterEnabled()) {
return returnMetricsForSingleProcess(req, res)
}

try {
const prometheusRegistry = req.app.get('serviceRegistry').prometheusRegistry
const { metricsData, contentType } =
Expand Down
14 changes: 12 additions & 2 deletions creator-node/src/utils/clusterUtils.ts
Expand Up @@ -20,16 +20,26 @@ class ClusterUtils {
this._specialWorkerId = specialWorkerId
}

/**
* Returns true if cluster mode is enabled. If it's disabled, then
* everything runs on one process with no primary or workers.
*/
isClusterEnabled() {
return config.get('clusterModeEnabled')
}

/**
* Returns true if this current worker process is the first worker, which performs
* some special initialization logic that other workers don't need to duplicate.
*/
isThisWorkerInit() {
return cluster.worker?.id === 1
return !this.isClusterEnabled() || cluster.worker?.id === 1
}

isThisWorkerSpecial() {
return cluster.worker?.id === this._specialWorkerId
return (
!this.isClusterEnabled() || cluster.worker?.id === this._specialWorkerId
)
}

getNumWorkers() {
Expand Down

0 comments on commit 399c068

Please sign in to comment.