Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Content Node on multiple cluster processes and migrate Bull->BullMQ #3881

Merged
merged 45 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9c2e4a7
Run CN on multiple cluster processes
theoilie Sep 18, 2022
b809722
Run Bull and BlacklistManager on single worker
theoilie Sep 19, 2022
07393a1
Whoops flip logic
theoilie Sep 19, 2022
be87881
Gate extra job adds
theoilie Sep 19, 2022
3b28ea7
Update test
theoilie Sep 19, 2022
f3bedb6
Let the bulls run
theoilie Sep 19, 2022
097d8de
Bull -> BullMQ
theoilie Sep 19, 2022
baec77b
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 19, 2022
7f1b134
Can't forget to push package.json
theoilie Sep 20, 2022
45a615d
Add missing dep
theoilie Sep 20, 2022
4cb298b
Fix eslint
theoilie Sep 20, 2022
3d1245f
job.finished -> job.waitForFinished
theoilie Sep 20, 2022
1d59aca
Migrate events to BullMQ
theoilie Sep 20, 2022
fd2809d
Fix migration issues to get mad-dog working
theoilie Sep 20, 2022
7c4ff94
One more migration fix
theoilie Sep 20, 2022
9ff4265
Fix unit test
theoilie Sep 20, 2022
082e3e4
Fix other part of unit test
theoilie Sep 20, 2022
540ecb2
Move SyncRequestDeDuplicator to redis
theoilie Sep 20, 2022
711e522
Update tests
theoilie Sep 20, 2022
180a5fb
Seed monitoring queue values only once
theoilie Sep 20, 2022
552ea8a
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 20, 2022
5e57c99
Add missing sync deduplicator await
theoilie Sep 21, 2022
398f651
Merge branch 'master' into theo-cluster-content-node
theoilie Sep 21, 2022
b48ea12
Respawn dead workers
theoilie Sep 21, 2022
acb1b0e
Duplicate init checks
theoilie Sep 21, 2022
17f566d
Add missing awaits + other feedback
theoilie Sep 21, 2022
57a5edb
Fix type
theoilie Sep 21, 2022
f36bab5
Make note about NODE_UNIQUE_ID
theoilie Sep 21, 2022
6048211
Make first worker await
theoilie Sep 21, 2022
eec7feb
Special logic for restarting worker ID=1
theoilie Sep 21, 2022
5de5d19
Merge branch 'master' into theo-cluster-content-node
theoilie Sep 21, 2022
3f39717
Forgot some utils
theoilie Sep 21, 2022
e7e35de
Prevent duplicate process initial job enqueuing
theoilie Sep 22, 2022
78590e8
Divide concurrency by worker count
theoilie Sep 22, 2022
cd76c23
Update test
theoilie Sep 22, 2022
b8e4eb6
Fix Bull import issue
theoilie Sep 22, 2022
e29d216
Gate adding to session expiration queue
theoilie Sep 22, 2022
96f214b
Address feedback
theoilie Sep 23, 2022
df175e6
Update test
theoilie Sep 23, 2022
a872878
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 23, 2022
37b8e84
Merge remote-tracking branch 'origin' into theo-cluster-content-node
theoilie Sep 26, 2022
bb81242
Document primary dying
theoilie Sep 26, 2022
2fa17e5
Clean up clusterUtils
theoilie Sep 26, 2022
639a4d8
Document patch-package
theoilie Sep 26, 2022
974eed9
Update test again
theoilie Sep 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion creator-node/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module.exports = {
parserOptions: {
tsconfigRootDir: __dirname,
ecmaVersion: 2020,
project: ['./tsconfig.json']
project: ['./tsconfig.json'],
tsconfigRootDir: __dirname
},
extends: [
'standard',
Expand Down
3 changes: 3 additions & 0 deletions creator-node/compose/env/base.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ redisPort=6379
# Can be overriden.
creatorNodeIsDebug=true

# Locally we run 4 CNs so we don't want to use too many processes for each
expressAppConcurrency=2

WAIT_HOSTS=

# Rate limiting
Expand Down
518 changes: 348 additions & 170 deletions creator-node/package-lock.json

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions creator-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"test:coverage:ci": "nyc --reporter=lcov --reporter=text npm run test:ci && nyc report --reporter=text-lcov | coveralls",
"lint:fix": "eslint --fix --ext=js,ts src",
"lint": "eslint --ext=js,ts src",
"build": "./node_modules/.bin/tsc --project tsconfig.build.json"
"build": "./node_modules/.bin/tsc --project tsconfig.build.json",
"postinstall": "patch-package"
},
"keywords": [],
"author": "",
Expand Down Expand Up @@ -48,7 +49,7 @@
"bl": "^4.1.0",
"body-parser": "^1.18.3",
"buffer": "5.4.2",
"bull": "4.8.2",
"bullmq": "^1.91.1",
"bunyan": "^1.8.15",
"cids": "0.8.0",
"commander": "^6.2.1",
Expand Down Expand Up @@ -77,6 +78,7 @@
"jimp": "^0.6.1",
"lodash": "4.17.21",
"multer": "^1.4.0",
"patch-package": "^6.4.7",
"pg": "^8.0.3",
"prettier-config-standard": "^4.0.0",
"prom-client": "^14.0.1",
Expand All @@ -92,12 +94,12 @@
"web3": "1.2.8"
},
"devDependencies": {
"@types/bull": "^3.15.8",
"@types/bunyan": "^1.8.8",
"@types/chai": "^4.3.3",
"@types/eth-sig-util": "^2.1.1",
"@types/express": "4.17.12",
"@types/fs-extra": "^9.0.13",
"@types/ioredis": "^4.28.10",
"@types/lodash": "^4.14.182",
"@types/mocha": "^9.1.1",
"@types/node": "^18.7.9",
Expand Down Expand Up @@ -133,6 +135,7 @@
"lodash": "Vuln in < 4.17.13, fixed by https://github.com/lodash/lodash/pull/4336"
},
"scriptsComments": {
"start": "Runs multiple processes using cluster. Starts as primary since process.env.NODE_UNIQUE_ID=undefined",
"coverage": "Runs nyc on tests/ dir and outputs results in ./nyc_output. Can be used for vscode extensions.",
"report": "Generates static html files representing code coverage per test file and outputs them into /coverage."
}
Expand Down
2 changes: 2 additions & 0 deletions creator-node/patches/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Folder contains patch files auto-generated by [patch-package](https://www.npmjs.com/package/patch-package).
These are applied during `npm run postinstall` to modify dependencies in node_modules.
30 changes: 30 additions & 0 deletions creator-node/patches/bullmq+1.91.1.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
diff --git a/node_modules/bullmq/dist/cjs/classes/child-pool.js b/node_modules/bullmq/dist/cjs/classes/child-pool.js
theoilie marked this conversation as resolved.
Show resolved Hide resolved
index 2e47bee..0369e22 100644
--- a/node_modules/bullmq/dist/cjs/classes/child-pool.js
+++ b/node_modules/bullmq/dist/cjs/classes/child-pool.js
@@ -8,6 +8,16 @@ const process_utils_1 = require("./process-utils");
const interfaces_1 = require("../interfaces");
const utils_1 = require("../utils");
const CHILD_KILL_TIMEOUT = 30000;
+
+const getFreePort = async () => {
+ return new Promise((res) => {
+ const srv = require("net").createServer();
+ srv.listen(0, () => {
+ const port = srv.address().port;
+ srv.close((err) => res(port));
+ });
+ });
+};
const convertExecArgv = async (execArgv) => {
const standard = [];
const convertedArgs = [];
@@ -18,7 +28,7 @@ const convertExecArgv = async (execArgv) => {
}
else {
const argName = arg.split('=')[0];
- const port = await (await Promise.resolve().then(() => require('get-port'))).default();
+ const port = await getFreePort();
convertedArgs.push(`${argName}=${port}`);
}
}
4 changes: 4 additions & 0 deletions creator-node/scripts/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ if [[ "$contentCacheLayerEnabled" == "true" ]]; then
openresty -p /usr/local/openresty -c /usr/local/openresty/conf/nginx.conf
fi

# Postinstall applies patches in the patches/ folder via patch-package. This fixes a bug in BullMQ:
# https://github.com/taskforcesh/bullmq/issues/1424
npm run postinstall
theoilie marked this conversation as resolved.
Show resolved Hide resolved
# index.js runs multiple processes using cluster. Starts as primary since process.env.NODE_UNIQUE_ID=undefined
if [[ "$devMode" == "true" ]]; then
if [ "$link_libs" = true ]; then
cd ../audius-libs
Expand Down
86 changes: 47 additions & 39 deletions creator-node/src/AsyncProcessingQueue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const Bull = require('bull')
const { Queue, Worker } = require('bullmq')
const { logger: genericLogger } = require('./logging')
const config = require('./config')
const redisClient = require('./redis')
const { clusterUtils } = require('./utils')

// Processing fns
const {
Expand All @@ -27,6 +28,7 @@ const PROCESS_STATES = Object.freeze({
DONE: 'DONE',
FAILED: 'FAILED'
})
const QUEUE_NAME = 'async-processing'

const ASYNC_PROCESSING_QUEUE_HISTORY = 500

Expand All @@ -39,48 +41,55 @@ const ASYNC_PROCESSING_QUEUE_HISTORY = 500

class AsyncProcessingQueue {
constructor(libs, prometheusRegistry) {
this.queue = new Bull('asyncProcessing', {
redis: {
host: config.get('redisHost'),
port: config.get('redisPort')
},
const connection = {
host: config.get('redisHost'),
port: config.get('redisPort')
}
this.queue = new Queue(QUEUE_NAME, {
connection,
defaultJobOptions: {
removeOnComplete: ASYNC_PROCESSING_QUEUE_HISTORY,
removeOnFail: ASYNC_PROCESSING_QUEUE_HISTORY
}
})

prometheusRegistry.startQueueMetrics(this.queue)

this.libs = libs

const untracedProcessTask = this.processTask
this.queue.process(MAX_CONCURRENCY, async (job, done) => {
const { logContext, parentSpanContext, task } = job.data
const processTask = instrumentTracing({
name: `AsyncProcessingQueue.process ${task}`,
fn: untracedProcessTask,
context: this,
options: {
// if a parentSpanContext is provided
// reference it so the async queue job can remember
// who enqueued it
links: parentSpanContext
? [
{
context: parentSpanContext
}
]
: [],
attributes: {
requestID: logContext.requestID,
[tracing.CODE_FILEPATH]: __filename
const worker = new Worker(
QUEUE_NAME,
async (job) => {
const { logContext, parentSpanContext, task } = job.data
const processTask = instrumentTracing({
name: `AsyncProcessingQueue.process ${task}`,
fn: untracedProcessTask,
context: this,
options: {
// if a parentSpanContext is provided
// reference it so the async queue job can remember
// who enqueued it
links: parentSpanContext
? [
{
context: parentSpanContext
}
]
: [],
attributes: {
requestID: logContext.requestID,
[tracing.CODE_FILEPATH]: __filename
}
}
}
})
})

await processTask(job, done)
})
await processTask(job)
},
{
connection,
concurrency: clusterUtils.getConcurrencyPerWorker(MAX_CONCURRENCY)
theoilie marked this conversation as resolved.
Show resolved Hide resolved
}
)
prometheusRegistry.startQueueMetrics(this.queue, worker)

this.PROCESS_NAMES = PROCESS_NAMES
this.PROCESS_STATES = PROCESS_STATES
Expand All @@ -90,7 +99,7 @@ class AsyncProcessingQueue {
this.constructProcessKey = this.constructAsyncProcessingKey.bind(this)
}

async processTask(job, done) {
async processTask(job) {
const { logContext, task } = job.data

const func = this.getFn(task)
Expand All @@ -107,7 +116,6 @@ class AsyncProcessingQueue {
logContext,
req: job.data.req
})
done(null, {})
} else {
this.logStatus(
`Succesfully handed off transcoding and segmenting to sp=${sp}. Wrapping up remainder of track association..`
Expand All @@ -117,12 +125,12 @@ class AsyncProcessingQueue {
logContext,
req: { ...job.data.req, transcodeFilePath, segmentFileNames }
})
done(null, { response: { transcodeFilePath, segmentFileNames } })
return { response: { transcodeFilePath, segmentFileNames } }
theoilie marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
try {
const response = await this.monitorProgress(task, func, job.data)
done(null, { response })
return { response }
} catch (e) {
tracing.recordException(e)
this.logError(
Expand All @@ -131,7 +139,7 @@ class AsyncProcessingQueue {
}: ${e.toString()}`,
logContext
)
done(e.toString())
return e.toString()
}
}
}
Expand Down Expand Up @@ -185,12 +193,12 @@ class AsyncProcessingQueue {
async addTask(params) {
const { logContext, task } = params

this.logStatus(
await this.logStatus(
`Adding ${task} task! uuid=${logContext.requestID}}`,
logContext
)

const job = await this.queue.add(params)
const job = await this.queue.add(QUEUE_NAME, params)

return job
}
Expand Down
43 changes: 24 additions & 19 deletions creator-node/src/ImageProcessingQueue.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
const Bull = require('bull')
const { Queue, QueueEvents, Worker } = require('bullmq')
const path = require('path')
const os = require('os')

const config = require('./config')
const { logger: genericLogger } = require('./logging')
const { clusterUtils } = require('./utils')
const resizeImage = require('./resizeImage')

const imageProcessingMaxConcurrency = config.get(
'imageProcessingMaxConcurrency'
Expand All @@ -22,34 +26,34 @@ const IMAGE_PROCESSING_QUEUE_HISTORY = 500

class ImageProcessingQueue {
constructor(prometheusRegistry = null) {
this.queue = new Bull('image-processing-queue', {
redis: {
port: config.get('redisPort'),
host: config.get('redisHost')
},
const connection = {
host: config.get('redisHost'),
port: config.get('redisPort')
}
this.queue = new Queue('image-processing-queue', {
connection,
defaultJobOptions: {
removeOnComplete: IMAGE_PROCESSING_QUEUE_HISTORY,
removeOnFail: IMAGE_PROCESSING_QUEUE_HISTORY
}
})

// Process jobs sandboxed - https://docs.bullmq.io/guide/workers/sandboxed-processors
const processorFile = path.join(__dirname, 'resizeImage.js')
theoilie marked this conversation as resolved.
Show resolved Hide resolved
const worker = new Worker('image-processing-queue', processorFile, {
connection,
concurrency: clusterUtils.getConcurrencyPerWorker(MAX_CONCURRENCY)
})
if (prometheusRegistry !== null && prometheusRegistry !== undefined) {
prometheusRegistry.startQueueMetrics(this.queue)
prometheusRegistry.startQueueMetrics(this.queue, worker)
}

/**
* Queue will process tasks concurrently if provided a concurrency number and a
* path to file containing job processor function
* https://github.com/OptimalBits/bull/tree/013c51942e559517c57a117c27a550a0fb583aa8#separate-processes
*/
this.queue.process(
PROCESS_NAMES.resizeImage /** job processor name */,
MAX_CONCURRENCY /** job processor concurrency */,
`${__dirname}/resizeImage.js` /** path to job processor function */
)

this.logStatus = this.logStatus.bind(this)
this.resizeImage = this.resizeImage.bind(this)

this.queueEvents = new QueueEvents('image-processing-queue', {
theoilie marked this conversation as resolved.
Show resolved Hide resolved
connection
})
}

/**
Expand Down Expand Up @@ -96,7 +100,8 @@ class ImageProcessingQueue {
square,
logContext
})
const result = await job.finished()

const result = await job.waitUntilFinished(this.queueEvents)
return result
}
}
Expand Down