Skip to content

Commit

Permalink
Aggregate metrics across cluster (#3939)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie committed Sep 29, 2022
1 parent b7230e2 commit e0cf953
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 19 deletions.
4 changes: 2 additions & 2 deletions creator-node/src/app.js
Expand Up @@ -143,7 +143,7 @@ const initializeApp = (port, serviceRegistry) => {
routes.map((route) => route.path),
prometheusMiddleware({
// Use existing registry for compatibility with custom metrics. Can see
// the metrics on /prometheus_metrics
// the metrics on /prometheus_metrics_worker
promRegistry: prometheusRegistry.registry,
// Override metric name to include namespace prefix
httpDurationMetricName: `${prometheusRegistry.namespacePrefix}_http_request_duration_seconds`,
Expand All @@ -155,7 +155,7 @@ const initializeApp = (port, serviceRegistry) => {
includeUp: false,
// The buckets in seconds to measure requests
buckets: [0.2, 0.5, ...exponentialBucketsRange(1, 60, 4)],
// Do not register the default /metrics route, since we have the /prometheus_metrics
// Do not register the default /metrics route, since we have the /prometheus_metrics_worker
autoregister: false,
// Normalizes the path to be tracked in this middleware. For routes with route params,
// this fn maps those routes to generic paths. e.g. /ipfs/QmSomeCid -> /ipfs/#CID
Expand Down
43 changes: 38 additions & 5 deletions creator-node/src/index.ts
@@ -1,6 +1,7 @@
'use strict'

import type { Cluster } from 'cluster'
import type { Cluster, Worker } from 'cluster'
import { AggregatorRegistry } from 'prom-client'
import { clusterUtils } from './utils'
const cluster: Cluster = require('cluster')

Expand All @@ -19,6 +20,11 @@ const { logger } = require('./logging')
const { serviceRegistry } = require('./serviceRegistry')
const redisClient = require('./redis')

// This should eventually only be instantiated in the primary and then workers should call setupClusterWorker().
// However, a bug currently requires instantiating this in workers as well:
// https://github.com/siimon/prom-client/issues/501
const aggregatorRegistry = new AggregatorRegistry()

const exitWithError = (...msg: any[]) => {
logger.error('ERROR: ', ...msg)
// eslint-disable-next-line no-process-exit
Expand Down Expand Up @@ -111,14 +117,26 @@ const startAppForPrimary = async () => {
}
})

for (const worker of Object.values(cluster.workers || {})) {
worker?.on('message', (msg) => {
if (msg?.cmd === 'setSpecialWorkerId') {
clusterUtils.specialWorkerId = msg?.val
const sendAggregatedMetricsToWorker = async (worker: Worker) => {
const metricsData = await aggregatorRegistry.clusterMetrics()
const contentType = aggregatorRegistry.contentType
worker.send({
cmd: 'receiveAggregatePrometheusMetrics',
val: {
metricsData,
contentType
}
})
}

// Handle message received from worker to primary
cluster.on('message', (workerWhoSentMsg, msg) => {
if (msg?.cmd === 'requestAggregatedPrometheusMetrics') {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
sendAggregatedMetricsToWorker(workerWhoSentMsg)
}
})

// Respawn workers and update each worker's knowledge of who the special worker is.
// The primary process doesn't need to be respawned because the whole app stops if the primary stops (since the workers are child processes of the primary)
cluster.on('exit', (worker, code, signal) => {
Expand Down Expand Up @@ -183,6 +201,21 @@ const startAppForWorker = async () => {
logger.info('Initialized app and server')
await serviceRegistry.initServicesThatRequireServer(appInfo.app)

cluster.worker!.on('message', (msg) => {
if (msg?.cmd === 'setSpecialWorkerId') {
clusterUtils.specialWorkerId = msg?.val
} else if (msg?.cmd === 'receiveAggregatePrometheusMetrics') {
try {
const { prometheusRegistry } = serviceRegistry
prometheusRegistry.resolvePromiseToGetAggregatedMetrics(msg?.val)
} catch (error: any) {
logger.error(
`Failed to send aggregated metrics data back to worker: ${error}`
)
}
}
})

if (clusterUtils.isThisWorkerInit() && process.send) {
process.send({ cmd: 'initComplete' })
}
Expand Down
21 changes: 19 additions & 2 deletions creator-node/src/routes/prometheusMetricsRoutes.js
@@ -1,17 +1,34 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
const express = require('express')

const router = express.Router()

/**
* Exposes Prometheus metrics at `GET /prometheus_metrics`
* Exposes Prometheus metrics for the worker (not aggregated) at `GET /prometheus_metrics_worker`
*/

router.get('/prometheus_metrics', async (req, res) => {
router.get('/prometheus_metrics_worker', async (req, res) => {
const prometheusRegistry = req.app.get('serviceRegistry').prometheusRegistry
const metricData = await prometheusRegistry.getAllMetricData()

res.setHeader('Content-Type', prometheusRegistry.registry.contentType)
return res.end(metricData)
})

/**
* Exposes Prometheus metrics aggregated across all workers at `GET /prometheus_metrics`
*/
router.get('/prometheus_metrics', async (req, res) => {
try {
const prometheusRegistry = req.app.get('serviceRegistry').prometheusRegistry
const { metricsData, contentType } =
await prometheusRegistry.getCustomAggregateMetricData()
res.setHeader('Content-Type', contentType)
return res.end(metricsData)
} catch (ex) {
res.statusCode = 500
return res.end(ex.message)
}
})

module.exports = router
Expand Up @@ -5,12 +5,9 @@ import {
METRICS,
METRIC_NAMES,
QUEUE_INTERVAL
// eslint-disable-next-line import/no-unresolved
} from './prometheus.constants'
import * as PrometheusClient from 'prom-client'

const cluster = require('cluster')

/**
* See `prometheusMonitoring/README.md` for usage details
*/
Expand All @@ -20,10 +17,17 @@ enum JOB_STATUS {
FAILED = 'failed'
}

type MetricsDataAndType = {
metricsData: any
contentType: any
}

export class PrometheusRegistry {
registry: any
metricNames: Record<string, string>
namespacePrefix: string
resolvePromiseToGetAggregatedMetrics?: (data: MetricsDataAndType) => void
promiseToGetAggregatedMetrics?: Promise<any>

public constructor() {
// Use default global registry to register metrics
Expand Down Expand Up @@ -149,12 +153,77 @@ export class PrometheusRegistry {
waiting || 0
)
})
.catch((_) => {})
}, QUEUE_INTERVAL)

return {
stop: () => clearInterval(metricInterval)
}
}

/**
* Entry point to the flow:
* 1. This worker sends `requestAggregatedPrometheusMetrics` IPC message to primary process
* 2. Primary aggregates metrics and sends `receiveAggregatePrometheusMetrics` IPC message back to this worker
* 3. This worker calls this.resolvePromiseToGetAggregatedMetrics() with the aggregate metrics from primary
*/
async getCustomAggregateMetricData() {
// Only initiate the flow if there's not already a promise in flight to get aggregate metrics data.
// A previous /prometheus_metrics request could've already initiated a promise
if (this.promiseToGetAggregatedMetrics === undefined) {
this.promiseToGetAggregatedMetrics = this.makePromiseToGetMetrics()
}

const metricsDataAndType = await this.promiseToGetAggregatedMetrics
return metricsDataAndType
}

/**
* @returns a Promise that will:
* * send a `requestAggregatedPrometheusMetrics` message to the primary process to aggregate metrics
* * resolve when the primary process sends back a `receiveAggregatePrometheusMetrics` message to this worker
* * timeout and reject after 10 seconds if it's not resolved first
*/
makePromiseToGetMetrics() {
return new Promise((resolve, reject) => {
// Timeout and reject after 10 seconds
const timeout = setTimeout(() => {
this.resetInFlightPromiseVariables()
reject(
new Error(
'Took too long to get aggregated metrics. This can happen if not all workers have initialized yet.'
)
)
}, 10_000)

// Set the function that will get called to resolve the promise when this worker
// receives a `receiveAggregatePrometheusMetrics` IPC message
this.resolvePromiseToGetAggregatedMetrics = (
aggregateMetricsDataAndType: MetricsDataAndType
) => {
if (timeout) {
clearTimeout(timeout)
}
this.resetInFlightPromiseVariables()
resolve(aggregateMetricsDataAndType)
}

// Send `requestAggregatedPrometheusMetrics` IPC message to the primary process to aggregate data
// from all workers. This worker listens for a `receiveAggregatePrometheusMetrics` message, at which point
// it will call this.resolvePromiseToGetAggregatedMetrics()
if (process.send) {
process.send({ cmd: 'requestAggregatedPrometheusMetrics' })
} else {
this.resetInFlightPromiseVariables()
reject(new Error('This process is somehow not a worker'))
}
})
}

resetInFlightPromiseVariables() {
this.resolvePromiseToGetAggregatedMetrics = undefined
this.promiseToGetAggregatedMetrics = undefined
}
}

module.exports = PrometheusRegistry
25 changes: 18 additions & 7 deletions creator-node/test/prometheus.test.js
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-misused-promises */
const assert = require('assert')
const request = require('supertest')

Expand Down Expand Up @@ -36,10 +37,12 @@ describe('test Prometheus metrics', async function () {
await server.close()
})

it('Checks that GET /prometheus_metrics is healthy and exposes default metrics', async function () {
it('Checks that GET /prometheus_metrics_worker is healthy and exposes default metrics', async function () {
await request(app).get('/health_check')

const resp = await request(app).get('/prometheus_metrics').expect(200)
const resp = await request(app)
.get('/prometheus_metrics_worker')
.expect(200)
assert.ok(
resp.text.includes(
NAMESPACE_PREFIX + '_default_' + 'process_cpu_user_seconds_total'
Expand All @@ -53,7 +56,9 @@ describe('test Prometheus metrics', async function () {

it('Checks that hitting unregistered routes does not track prometheus metrics', async function () {
await request(app).get('/blahblahblah')
const resp = await request(app).get('/prometheus_metrics').expect(200)
const resp = await request(app)
.get('/prometheus_metrics_worker')
.expect(200)

assert.ok(!resp.text.includes('blahblahblah'))
})
Expand All @@ -62,7 +67,9 @@ describe('test Prometheus metrics', async function () {
await request(app).get('/ipfs/QmVickyWasHere')
await request(app).get('/content/QmVickyWasHere')

const resp = await request(app).get('/prometheus_metrics').expect(200)
const resp = await request(app)
.get('/prometheus_metrics_worker')
.expect(200)

assert.ok(
resp.text.includes(
Expand Down Expand Up @@ -119,10 +126,12 @@ describe('test Prometheus metrics', async function () {
assert.ok(!resp.text.includes('/content/:CID'))
})

it('Checks that GET /prometheus_metrics exposes bull queue metrics', async function () {
it('Checks that GET /prometheus_metrics_worker exposes bull queue metrics', async function () {
await request(app).get('/health_check')

const resp = await request(app).get('/prometheus_metrics').expect(200)
const resp = await request(app)
.get('/prometheus_metrics_worker')
.expect(200)
assert.ok(resp.text.includes(NAMESPACE_PREFIX + '_jobs_completed'))
assert.ok(resp.text.includes(NAMESPACE_PREFIX + '_jobs_waiting'))
assert.ok(resp.text.includes(NAMESPACE_PREFIX + '_jobs_failed'))
Expand All @@ -136,7 +145,9 @@ describe('test Prometheus metrics', async function () {

await job.waitUntilFinished(genericBullQueue.queueEvents)

const resp = await request(app).get('/prometheus_metrics').expect(200)
const resp = await request(app)
.get('/prometheus_metrics_worker')
.expect(200)
assert.ok(
resp.text.includes(NAMESPACE_PREFIX + '_jobs_duration_seconds_bucket')
)
Expand Down

0 comments on commit e0cf953

Please sign in to comment.