Skip to content

Commit

Permalink
Run Content Node on multiple cluster processes and migrate Bull->Bull…
Browse files Browse the repository at this point in the history
…MQ (#3881)
  • Loading branch information
theoilie committed Sep 26, 2022
1 parent a39471f commit 56340ff
Show file tree
Hide file tree
Showing 42 changed files with 1,500 additions and 1,708 deletions.
3 changes: 2 additions & 1 deletion creator-node/.eslintrc.js
Expand Up @@ -3,7 +3,8 @@ module.exports = {
parserOptions: {
tsconfigRootDir: __dirname,
ecmaVersion: 2020,
project: ['./tsconfig.json']
project: ['./tsconfig.json'],
tsconfigRootDir: __dirname
},
extends: [
'standard',
Expand Down
3 changes: 3 additions & 0 deletions creator-node/compose/env/base.env
Expand Up @@ -13,6 +13,9 @@ redisPort=6379
# Can be overriden.
creatorNodeIsDebug=true

# Locally we run 4 CNs so we don't want to use too many processes for each
expressAppConcurrency=2

WAIT_HOSTS=

# Rate limiting
Expand Down
518 changes: 348 additions & 170 deletions creator-node/package-lock.json

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions creator-node/package.json
Expand Up @@ -17,7 +17,8 @@
"test:coverage:ci": "nyc --reporter=lcov --reporter=text npm run test:ci && nyc report --reporter=text-lcov | coveralls",
"lint:fix": "eslint --fix --ext=js,ts src",
"lint": "eslint --ext=js,ts src",
"build": "./node_modules/.bin/tsc --project tsconfig.build.json"
"build": "./node_modules/.bin/tsc --project tsconfig.build.json",
"postinstall": "patch-package"
},
"keywords": [],
"author": "",
Expand Down Expand Up @@ -48,7 +49,7 @@
"bl": "^4.1.0",
"body-parser": "^1.18.3",
"buffer": "5.4.2",
"bull": "4.8.2",
"bullmq": "^1.91.1",
"bunyan": "^1.8.15",
"cids": "0.8.0",
"commander": "^6.2.1",
Expand Down Expand Up @@ -77,6 +78,7 @@
"jimp": "^0.6.1",
"lodash": "4.17.21",
"multer": "^1.4.0",
"patch-package": "^6.4.7",
"pg": "^8.0.3",
"prettier-config-standard": "^4.0.0",
"prom-client": "^14.0.1",
Expand All @@ -92,12 +94,12 @@
"web3": "1.2.8"
},
"devDependencies": {
"@types/bull": "^3.15.8",
"@types/bunyan": "^1.8.8",
"@types/chai": "^4.3.3",
"@types/eth-sig-util": "^2.1.1",
"@types/express": "4.17.12",
"@types/fs-extra": "^9.0.13",
"@types/ioredis": "^4.28.10",
"@types/lodash": "^4.14.182",
"@types/mocha": "^9.1.1",
"@types/node": "^18.7.9",
Expand Down Expand Up @@ -133,6 +135,7 @@
"lodash": "Vuln in < 4.17.13, fixed by https://github.com/lodash/lodash/pull/4336"
},
"scriptsComments": {
"start": "Runs multiple processes using cluster. Starts as primary since process.env.NODE_UNIQUE_ID=undefined",
"coverage": "Runs nyc on tests/ dir and outputs results in ./nyc_output. Can be used for vscode extensions.",
"report": "Generates static html files representing code coverage per test file and outputs them into /coverage."
}
Expand Down
2 changes: 2 additions & 0 deletions creator-node/patches/README.md
@@ -0,0 +1,2 @@
Folder contains patch files auto-generated by [patch-package](https://www.npmjs.com/package/patch-package).
These are applied during `npm run postinstall` to modify dependencies in node_modules.
30 changes: 30 additions & 0 deletions creator-node/patches/bullmq+1.91.1.patch
@@ -0,0 +1,30 @@
diff --git a/node_modules/bullmq/dist/cjs/classes/child-pool.js b/node_modules/bullmq/dist/cjs/classes/child-pool.js
index 2e47bee..0369e22 100644
--- a/node_modules/bullmq/dist/cjs/classes/child-pool.js
+++ b/node_modules/bullmq/dist/cjs/classes/child-pool.js
@@ -8,6 +8,16 @@ const process_utils_1 = require("./process-utils");
const interfaces_1 = require("../interfaces");
const utils_1 = require("../utils");
const CHILD_KILL_TIMEOUT = 30000;
+
+const getFreePort = async () => {
+ return new Promise((res) => {
+ const srv = require("net").createServer();
+ srv.listen(0, () => {
+ const port = srv.address().port;
+ srv.close((err) => res(port));
+ });
+ });
+};
const convertExecArgv = async (execArgv) => {
const standard = [];
const convertedArgs = [];
@@ -18,7 +28,7 @@ const convertExecArgv = async (execArgv) => {
}
else {
const argName = arg.split('=')[0];
- const port = await (await Promise.resolve().then(() => require('get-port'))).default();
+ const port = await getFreePort();
convertedArgs.push(`${argName}=${port}`);
}
}
4 changes: 4 additions & 0 deletions creator-node/scripts/start.sh
Expand Up @@ -65,6 +65,10 @@ if [[ "$contentCacheLayerEnabled" == "true" ]]; then
openresty -p /usr/local/openresty -c /usr/local/openresty/conf/nginx.conf
fi

# Postinstall applies patches in the patches/ folder via patch-package. This fixes a bug in BullMQ:
# https://github.com/taskforcesh/bullmq/issues/1424
npm run postinstall
# index.js runs multiple processes using cluster. Starts as primary since process.env.NODE_UNIQUE_ID=undefined
if [[ "$devMode" == "true" ]]; then
if [ "$link_libs" = true ]; then
cd ../audius-libs
Expand Down
86 changes: 47 additions & 39 deletions creator-node/src/AsyncProcessingQueue.js
@@ -1,7 +1,8 @@
const Bull = require('bull')
const { Queue, Worker } = require('bullmq')
const { logger: genericLogger } = require('./logging')
const config = require('./config')
const redisClient = require('./redis')
const { clusterUtils } = require('./utils')

// Processing fns
const {
Expand All @@ -27,6 +28,7 @@ const PROCESS_STATES = Object.freeze({
DONE: 'DONE',
FAILED: 'FAILED'
})
const QUEUE_NAME = 'async-processing'

const ASYNC_PROCESSING_QUEUE_HISTORY = 500

Expand All @@ -39,48 +41,55 @@ const ASYNC_PROCESSING_QUEUE_HISTORY = 500

class AsyncProcessingQueue {
constructor(libs, prometheusRegistry) {
this.queue = new Bull('asyncProcessing', {
redis: {
host: config.get('redisHost'),
port: config.get('redisPort')
},
const connection = {
host: config.get('redisHost'),
port: config.get('redisPort')
}
this.queue = new Queue(QUEUE_NAME, {
connection,
defaultJobOptions: {
removeOnComplete: ASYNC_PROCESSING_QUEUE_HISTORY,
removeOnFail: ASYNC_PROCESSING_QUEUE_HISTORY
}
})

prometheusRegistry.startQueueMetrics(this.queue)

this.libs = libs

const untracedProcessTask = this.processTask
this.queue.process(MAX_CONCURRENCY, async (job, done) => {
const { logContext, parentSpanContext, task } = job.data
const processTask = instrumentTracing({
name: `AsyncProcessingQueue.process ${task}`,
fn: untracedProcessTask,
context: this,
options: {
// if a parentSpanContext is provided
// reference it so the async queue job can remember
// who enqueued it
links: parentSpanContext
? [
{
context: parentSpanContext
}
]
: [],
attributes: {
requestID: logContext.requestID,
[tracing.CODE_FILEPATH]: __filename
const worker = new Worker(
QUEUE_NAME,
async (job) => {
const { logContext, parentSpanContext, task } = job.data
const processTask = instrumentTracing({
name: `AsyncProcessingQueue.process ${task}`,
fn: untracedProcessTask,
context: this,
options: {
// if a parentSpanContext is provided
// reference it so the async queue job can remember
// who enqueued it
links: parentSpanContext
? [
{
context: parentSpanContext
}
]
: [],
attributes: {
requestID: logContext.requestID,
[tracing.CODE_FILEPATH]: __filename
}
}
}
})
})

await processTask(job, done)
})
await processTask(job)
},
{
connection,
concurrency: clusterUtils.getConcurrencyPerWorker(MAX_CONCURRENCY)
}
)
prometheusRegistry.startQueueMetrics(this.queue, worker)

this.PROCESS_NAMES = PROCESS_NAMES
this.PROCESS_STATES = PROCESS_STATES
Expand All @@ -90,7 +99,7 @@ class AsyncProcessingQueue {
this.constructProcessKey = this.constructAsyncProcessingKey.bind(this)
}

async processTask(job, done) {
async processTask(job) {
const { logContext, task } = job.data

const func = this.getFn(task)
Expand All @@ -107,7 +116,6 @@ class AsyncProcessingQueue {
logContext,
req: job.data.req
})
done(null, {})
} else {
this.logStatus(
`Succesfully handed off transcoding and segmenting to sp=${sp}. Wrapping up remainder of track association..`
Expand All @@ -117,12 +125,12 @@ class AsyncProcessingQueue {
logContext,
req: { ...job.data.req, transcodeFilePath, segmentFileNames }
})
done(null, { response: { transcodeFilePath, segmentFileNames } })
return { response: { transcodeFilePath, segmentFileNames } }
}
} else {
try {
const response = await this.monitorProgress(task, func, job.data)
done(null, { response })
return { response }
} catch (e) {
tracing.recordException(e)
this.logError(
Expand All @@ -131,7 +139,7 @@ class AsyncProcessingQueue {
}: ${e.toString()}`,
logContext
)
done(e.toString())
return e.toString()
}
}
}
Expand Down Expand Up @@ -185,12 +193,12 @@ class AsyncProcessingQueue {
async addTask(params) {
const { logContext, task } = params

this.logStatus(
await this.logStatus(
`Adding ${task} task! uuid=${logContext.requestID}}`,
logContext
)

const job = await this.queue.add(params)
const job = await this.queue.add(QUEUE_NAME, params)

return job
}
Expand Down
43 changes: 24 additions & 19 deletions creator-node/src/ImageProcessingQueue.js
@@ -1,7 +1,11 @@
const Bull = require('bull')
const { Queue, QueueEvents, Worker } = require('bullmq')
const path = require('path')
const os = require('os')

const config = require('./config')
const { logger: genericLogger } = require('./logging')
const { clusterUtils } = require('./utils')
const resizeImage = require('./resizeImage')

const imageProcessingMaxConcurrency = config.get(
'imageProcessingMaxConcurrency'
Expand All @@ -22,34 +26,34 @@ const IMAGE_PROCESSING_QUEUE_HISTORY = 500

class ImageProcessingQueue {
constructor(prometheusRegistry = null) {
this.queue = new Bull('image-processing-queue', {
redis: {
port: config.get('redisPort'),
host: config.get('redisHost')
},
const connection = {
host: config.get('redisHost'),
port: config.get('redisPort')
}
this.queue = new Queue('image-processing-queue', {
connection,
defaultJobOptions: {
removeOnComplete: IMAGE_PROCESSING_QUEUE_HISTORY,
removeOnFail: IMAGE_PROCESSING_QUEUE_HISTORY
}
})

// Process jobs sandboxed - https://docs.bullmq.io/guide/workers/sandboxed-processors
const processorFile = path.join(__dirname, 'resizeImage.js')
const worker = new Worker('image-processing-queue', processorFile, {
connection,
concurrency: clusterUtils.getConcurrencyPerWorker(MAX_CONCURRENCY)
})
if (prometheusRegistry !== null && prometheusRegistry !== undefined) {
prometheusRegistry.startQueueMetrics(this.queue)
prometheusRegistry.startQueueMetrics(this.queue, worker)
}

/**
* Queue will process tasks concurrently if provided a concurrency number and a
* path to file containing job processor function
* https://github.com/OptimalBits/bull/tree/013c51942e559517c57a117c27a550a0fb583aa8#separate-processes
*/
this.queue.process(
PROCESS_NAMES.resizeImage /** job processor name */,
MAX_CONCURRENCY /** job processor concurrency */,
`${__dirname}/resizeImage.js` /** path to job processor function */
)

this.logStatus = this.logStatus.bind(this)
this.resizeImage = this.resizeImage.bind(this)

this.queueEvents = new QueueEvents('image-processing-queue', {
connection
})
}

/**
Expand Down Expand Up @@ -96,7 +100,8 @@ class ImageProcessingQueue {
square,
logContext
})
const result = await job.finished()

const result = await job.waitUntilFinished(this.queueEvents)
return result
}
}
Expand Down

0 comments on commit 56340ff

Please sign in to comment.