Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Content Node on multiple cluster processes and migrate Bull->BullMQ #3881

Merged
merged 45 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9c2e4a7
Run CN on multiple cluster processes
theoilie Sep 18, 2022
b809722
Run Bull and BlacklistManager on single worker
theoilie Sep 19, 2022
07393a1
Whoops flip logic
theoilie Sep 19, 2022
be87881
Gate extra job adds
theoilie Sep 19, 2022
3b28ea7
Update test
theoilie Sep 19, 2022
f3bedb6
Let the bulls run
theoilie Sep 19, 2022
097d8de
Bull -> BullMQ
theoilie Sep 19, 2022
baec77b
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 19, 2022
7f1b134
Can't forget to push package.json
theoilie Sep 20, 2022
45a615d
Add missing dep
theoilie Sep 20, 2022
4cb298b
Fix eslint
theoilie Sep 20, 2022
3d1245f
job.finished -> job.waitForFinished
theoilie Sep 20, 2022
1d59aca
Migrate events to BullMQ
theoilie Sep 20, 2022
fd2809d
Fix migration issues to get mad-dog working
theoilie Sep 20, 2022
7c4ff94
One more migration fix
theoilie Sep 20, 2022
9ff4265
Fix unit test
theoilie Sep 20, 2022
082e3e4
Fix other part of unit test
theoilie Sep 20, 2022
540ecb2
Move SyncRequestDeDuplicator to redis
theoilie Sep 20, 2022
711e522
Update tests
theoilie Sep 20, 2022
180a5fb
Seed monitoring queue values only once
theoilie Sep 20, 2022
552ea8a
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 20, 2022
5e57c99
Add missing sync deduplicator await
theoilie Sep 21, 2022
398f651
Merge branch 'master' into theo-cluster-content-node
theoilie Sep 21, 2022
b48ea12
Respawn dead workers
theoilie Sep 21, 2022
acb1b0e
Duplicate init checks
theoilie Sep 21, 2022
17f566d
Add missing awaits + other feedback
theoilie Sep 21, 2022
57a5edb
Fix type
theoilie Sep 21, 2022
f36bab5
Make note about NODE_UNIQUE_ID
theoilie Sep 21, 2022
6048211
Make first worker await
theoilie Sep 21, 2022
eec7feb
Special logic for restarting worker ID=1
theoilie Sep 21, 2022
5de5d19
Merge branch 'master' into theo-cluster-content-node
theoilie Sep 21, 2022
3f39717
Forgot some utils
theoilie Sep 21, 2022
e7e35de
Prevent duplicate process initial job enqueuing
theoilie Sep 22, 2022
78590e8
Divide concurrency by worker count
theoilie Sep 22, 2022
cd76c23
Update test
theoilie Sep 22, 2022
b8e4eb6
Fix Bull import issue
theoilie Sep 22, 2022
e29d216
Gate adding to session expiration queue
theoilie Sep 22, 2022
96f214b
Address feedback
theoilie Sep 23, 2022
df175e6
Update test
theoilie Sep 23, 2022
a872878
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 23, 2022
37b8e84
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 26, 2022
bb81242
Document primary dying
theoilie Sep 26, 2022
2fa17e5
Clean up clusterUtils
theoilie Sep 26, 2022
639a4d8
Document patch-package
theoilie Sep 26, 2022
974eed9
Update test again
theoilie Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions creator-node/compose/env/base.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ redisPort=6379
# Can be overriden.
creatorNodeIsDebug=true

# Locally we run 4 CNs so we don't want to use multiple processes for each
expressAppConcurrency=1
theoilie marked this conversation as resolved.
Show resolved Hide resolved

WAIT_HOSTS=

# Rate limiting
Expand Down
19 changes: 12 additions & 7 deletions creator-node/src/blacklistManager.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const cluster = require('cluster')

const { logger } = require('./logging')
const models = require('./models')
const redis = require('./redis')
Expand Down Expand Up @@ -29,13 +31,16 @@ class BlacklistManager {
try {
this.log('Initializing BlacklistManager...')

const { trackIdsToBlacklist, userIdsToBlacklist, segmentsToBlacklist } =
await this.getDataToBlacklist()
await this.fetchCIDsAndAddToRedis({
trackIdsToBlacklist,
userIdsToBlacklist,
segmentsToBlacklist
})
// Adding to redis only needs to be done once, but multiple workers might all run the app with their own BlacklistManager
theoilie marked this conversation as resolved.
Show resolved Hide resolved
if (cluster.worker?.id === 1) {
theoilie marked this conversation as resolved.
Show resolved Hide resolved
const { trackIdsToBlacklist, userIdsToBlacklist, segmentsToBlacklist } =
await this.getDataToBlacklist()
await this.fetchCIDsAndAddToRedis({
trackIdsToBlacklist,
userIdsToBlacklist,
segmentsToBlacklist
})
}
theoilie marked this conversation as resolved.
Show resolved Hide resolved

this.initialized = true

Expand Down
6 changes: 6 additions & 0 deletions creator-node/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ const config = convict({
env: 'printSequelizeLogs',
default: false
},
expressAppConcurrency: {
doc: 'Number of processes to spawn, where each process runs its own Content Node. Default 0 to run one process per core (auto-detected)',
format: 'nat',
env: 'expressAppConcurrency',
default: 0
},

// Transcoding settings
transcodingMaxConcurrency: {
Expand Down
172 changes: 116 additions & 56 deletions creator-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
'use strict'

import type { Cluster } from 'cluster'
import type { CpuInfo } from 'os'
import type { LoDashStatic } from 'lodash'
const cluster: Cluster = require('cluster')
const { cpus }: { cpus: () => CpuInfo[] } = require('os')
const _: LoDashStatic = require('lodash')
const Bull = require('bull')

const { setupTracing } = require('./tracer')
setupTracing('content-node')

Expand All @@ -15,58 +23,64 @@ const { logger } = require('./logging')
const { serviceRegistry } = require('./serviceRegistry')
const redisClient = require('./redis')

const exitWithError = (...msg: any[]) => {
logger.error('ERROR: ', ...msg)
process.exit(1)
}
// Make sure some logic is only performed by 1 worker not all
if (cluster.worker?.id !== 1) {
logger.info(
`Disabling queues from running on worker ID=${cluster.worker?.id}`
)

const verifyDBConnection = async () => {
try {
logger.info('Verifying DB connection...')
await sequelize.authenticate() // runs SELECT 1+1 AS result to check db connection
logger.info('DB connected successfully!')
} catch (connectionError) {
exitWithError('Error connecting to DB:', connectionError)
}
// See https://github.com/OptimalBits/bull/blob/develop/lib/queue.js for queue functions to override

// Obliterating queues is one-time logic on startup
Bull.prototype.obliterate = async function (opts: any) {}
Bull.prototype.empty = function () {}
// Bull isn't quality enough to make workers not take the some jobs
Bull.prototype.process = function (
name: any,
concurrency: any,
handler: any
) {}
Bull.prototype.start = function (concurrency: any, name: any) {}
Bull.prototype.run = function (concurrency: any, handlerName: any) {}
// We also don't want to see log spam from events or run onComplete callbacks
Bull.prototype.on = function (eventName: any) {}
Bull.prototype.constructor = Bull
}

const runDBMigrations = async () => {
try {
logger.info('Executing database migrations...')
await runMigrations()
logger.info('Migrations completed successfully')
} catch (migrationError) {
exitWithError('Error in migrations:', migrationError)
// The primary process performs one-time validation and spawns worker processes that each run the Express app
const startAppForPrimary = async () => {
const exitWithError = (...msg: any[]) => {
logger.error('ERROR: ', ...msg)
process.exit(1)
}
}

const connectToDBAndRunMigrations = async () => {
await verifyDBConnection()
await clearRunningQueries()
await runDBMigrations()
}

/**
* Setting a different port is necessary for OpenResty to work. If OpenResty
* is enabled, have the app run on port 3000. Else, run on its configured port.
* @returns the port number to configure the Content Node app
*/
const getPort = () => {
const contentCacheLayerEnabled = config.get('contentCacheLayerEnabled')

if (contentCacheLayerEnabled) {
return 3000
const verifyDBConnection = async () => {
try {
logger.info('Verifying DB connection...')
await sequelize.authenticate() // runs SELECT 1+1 AS result to check db connection
logger.info('DB connected successfully!')
} catch (connectionError) {
exitWithError('Error connecting to DB:', connectionError)
}
}

return config.get('port')
}
const runDBMigrations = async () => {
try {
logger.info('Executing database migrations...')
await runMigrations()
logger.info('Migrations completed successfully')
} catch (migrationError) {
exitWithError('Error in migrations:', migrationError)
}
}

const startApp = async () => {
logger.info('Configuring service...')
const connectToDBAndRunMigrations = async () => {
await verifyDBConnection()
await clearRunningQueries()
await runDBMigrations()
}

await config.asyncConfig()

// fail if delegateOwnerWallet & delegatePrivateKey not present
const delegateOwnerWallet = config.get('delegateOwnerWallet')
const delegatePrivateKey = config.get('delegatePrivateKey')
const creatorNodeEndpoint = config.get('creatorNodeEndpoint')
Expand All @@ -77,7 +91,9 @@ const startApp = async () => {
)
}

// fail if delegateOwnerWallet doesn't derive from delegatePrivateKey
logger.info(`Primary process with pid=${process.pid} is running`)

// Fail if delegateOwnerWallet doesn't derive from delegatePrivateKey
const privateKeyBuffer = Buffer.from(
config.get('delegatePrivateKey').replace('0x', ''),
'hex'
Expand All @@ -88,15 +104,61 @@ const startApp = async () => {
throw new Error('Invalid delegatePrivateKey/delegateOwnerWallet pair')
}

// Fail if Trusted Notifier isn't configured properly
const trustedNotifierEnabled = !!config.get('trustedNotifierID')
const nodeOperatorEmailAddress = config.get('nodeOperatorEmailAddress')

if (!trustedNotifierEnabled && !nodeOperatorEmailAddress) {
exitWithError(
'Cannot startup without a trustedNotifierID or nodeOperatorEmailAddress'
)
}

await connectToDBAndRunMigrations()

// Clear all redis locks
try {
await redisClient.WalletWriteLock.clearWriteLocks()
} catch (e: any) {
logger.warn(`Could not clear write locks. Skipping..: ${e.message}`)
}

// This is called `cpus()` but it actually returns the # of logical cores, which is possibly higher than # of physical cores if there's hyperthreading
const logicalCores = cpus().length
const numWorkers = config.get('expressAppConcurrency') || logicalCores
logger.info(`Spawning ${numWorkers} processes to run the Express app...`)
_.times(numWorkers, () => cluster.fork())
theoilie marked this conversation as resolved.
Show resolved Hide resolved

cluster.on('exit', (worker, code, signal) => {
logger.info(`Worker process with pid=${worker.process.pid} died`)
})
theoilie marked this conversation as resolved.
Show resolved Hide resolved
}
theoilie marked this conversation as resolved.
Show resolved Hide resolved

// Workers don't share memory, so each one is its own Express instance with its own version of objects like serviceRegistry
const startAppForWorker = async () => {
theoilie marked this conversation as resolved.
Show resolved Hide resolved
/**
* Setting a different port is necessary for OpenResty to work. If OpenResty
* is enabled, have the app run on port 3000. Else, run on its configured port.
* @returns the port number to configure the Content Node app
*/
const getPort = () => {
const contentCacheLayerEnabled = config.get('contentCacheLayerEnabled')

if (contentCacheLayerEnabled) {
return 3000
}

return config.get('port')
}

logger.info(
`Worker process with pid=${process.pid} and worker ID=${cluster.worker?.id} is running`
)

await config.asyncConfig()
const privateKeyBuffer = Buffer.from(
config.get('delegatePrivateKey').replace('0x', ''),
'hex'
)
try {
const solDelegateKeypair = Keypair.fromSeed(privateKeyBuffer)
const solDelegatePrivateKey = solDelegateKeypair.secretKey
Expand All @@ -110,30 +172,21 @@ const startApp = async () => {
)
}

await connectToDBAndRunMigrations()

const nodeMode = config.get('devMode') ? 'Dev Mode' : 'Production Mode'

await serviceRegistry.initServices()
theoilie marked this conversation as resolved.
Show resolved Hide resolved
logger.info(`Initialized services (Node running in ${nodeMode})`)

const appInfo = initializeApp(getPort(), serviceRegistry)
logger.info('Initialized app and server')

// Clear all redis locks
try {
await redisClient.WalletWriteLock.clearWriteLocks()
} catch (e: any) {
logger.warn(`Could not clear write locks. Skipping..: ${e.message}`)
}

// Initialize services that do not require the server, but do not need to be awaited.
serviceRegistry.initServicesAsynchronously()

// Some Services cannot start until server is up. Start them now
// No need to await on this as this process can take a while and can run in the background
serviceRegistry.initServicesThatRequireServer(appInfo.app)

// when app terminates, close down any open DB connections gracefully
// When app terminates, close down any open DB connections gracefully
theoilie marked this conversation as resolved.
Show resolved Hide resolved
ON_DEATH((signal: any, error: any) => {
theoilie marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: log messages emitted here may be swallowed up if using the bunyan CLI (used by
// default in `npm start` command). To see messages emitted after a kill signal, do not
Expand All @@ -145,4 +198,11 @@ const startApp = async () => {
}
})
}
startApp()

if (cluster.isMaster) {
SidSethi marked this conversation as resolved.
Show resolved Hide resolved
theoilie marked this conversation as resolved.
Show resolved Hide resolved
startAppForPrimary()
} else if (cluster.isWorker) {
startAppForWorker()
} else {
throw new Error("Can't determine if process is primary or worker in cluster")
}
5 changes: 1 addition & 4 deletions creator-node/src/monitors/MonitoringQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MonitoringQueue {
this.queue.process(
PROCESS_NAMES.monitor,
/* concurrency */ 1,
async (job, done) => {
async (_) => {
try {
this.logStatus('Starting')

Expand All @@ -55,11 +55,8 @@ class MonitoringQueue {
this.logStatus(`Error on ${monitor.name} ${e}`)
}
})

done(null, {})
} catch (e) {
this.logStatus(`Error ${e}`)
done(e)
}
}
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const BullQueue = require('bull')
const _ = require('lodash')
const cluster = require('cluster')

const config = require('../../../config')
const {
Expand Down Expand Up @@ -50,10 +51,12 @@ class StateMonitoringManager {
duration: config.get('fetchCNodeEndpointToSpIdMapIntervalMs')
}
})
await this.startEndpointToSpIdMapQueue(
cNodeEndpointToSpIdMapQueue,
prometheusRegistry
)
if (cluster.worker?.id === 1) {
await this.startEndpointToSpIdMapQueue(
cNodeEndpointToSpIdMapQueue,
prometheusRegistry
)
}

// Create queue to slice through batches of users and gather data to be passed to find-sync and find-replica-set-update jobs
const monitorStateQueue = makeQueue({
Expand Down Expand Up @@ -107,7 +110,12 @@ class StateMonitoringManager {
await findReplicaSetUpdatesQueue.obliterate({ force: true })

// Enqueue first monitor-state job
await this.startMonitorStateQueue(monitorStateQueue, discoveryNodeEndpoint)
if (cluster.worker?.id === 1) {
await this.startMonitorStateQueue(
monitorStateQueue,
discoveryNodeEndpoint
)
}

return {
monitorStateQueue,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const cluster = require('cluster')

const config = require('../../../config')
const {
QUEUE_HISTORY,
Expand Down Expand Up @@ -79,10 +81,12 @@ class StateReconciliationManager {
await recoverOrphanedDataQueue.obliterate({ force: true })

// Queue the first recoverOrphanedData job, which will re-enqueue itself
await this.startRecoverOrphanedDataQueue(
recoverOrphanedDataQueue,
discoveryNodeEndpoint
)
if (cluster.worker?.id === 1) {
await this.startRecoverOrphanedDataQueue(
recoverOrphanedDataQueue,
discoveryNodeEndpoint
)
}

this.registerQueueEventHandlersAndJobProcessors({
manualSyncQueue,
Expand Down
10 changes: 10 additions & 0 deletions creator-node/test/StateMonitoringManager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ describe('test StateMonitoringManager initialization, events, and re-enqueuing',
const MockStateMonitoringManager = proxyquire(
'../src/services/stateMachineManager/stateMonitoring/index.js',
{
cluster: {
worker: {
id: 1
}
},
'../../../config': config
}
)
Expand Down Expand Up @@ -174,6 +179,11 @@ describe('test StateMonitoringManager initialization, events, and re-enqueuing',
const MockStateMonitoringManager = proxyquire(
'../src/services/stateMachineManager/stateMonitoring/index.js',
{
cluster: {
worker: {
id: 1
}
},
'../../../config': config
}
)
Expand Down