Skip to content

Commit

Permalink
Make storagePath metrics work in cluster mode (#4466)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie committed Dec 14, 2022
1 parent 35f17a1 commit 258cb5e
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 180 deletions.
2 changes: 1 addition & 1 deletion creator-node/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
getRateLimiterMiddleware
} from './reqLimiter'
import config from './config'
import { exponentialBucketsRange } from './services/prometheusMonitoring/prometheusUtils'
import { exponentialBucketsRange } from './services/prometheusMonitoring/prometheusSetupUtils'
import healthCheckRoutes from './components/healthCheck/healthCheckController'
import contentBlacklistRoutes from './components/contentBlacklist/contentBlacklistController'
import replicaSetRoutes from './components/replicaSet/replicaSetController'
Expand Down
4 changes: 3 additions & 1 deletion creator-node/src/blacklistManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ const { logger } = require('./logging')
const models = require('./models')
const redis = require('./redis')
const config = require('./config')
const { clusterUtilsForWorker } = require('./utils')
const {
clusterUtilsForWorker
} = require('./utils/cluster/clusterUtilsForWorker')

const CID_WHITELIST = new Set(config.get('cidWhitelist').split(','))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { LogContext } from '../../apiHelpers'
import type { LogContext } from '../../utils'
import type Logger from 'bunyan'

import axios from 'axios'
Expand Down
124 changes: 105 additions & 19 deletions creator-node/src/diskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
computeFilePath,
computeFilePathAndEnsureItExists,
computeFilePathInDirAndEnsureItExists,
getCharsInRanges
getCharsInRanges,
clusterUtilsForPrimary
} from './utils'
import { fetchFileFromNetworkAndSaveToFS } from './fileManager'
import BlacklistManager from './blacklistManager'
Expand Down Expand Up @@ -445,7 +446,13 @@ async function _touch(path: string) {
async function _copyLegacyFiles(
legacyPathsAndCids: { storagePath: string; cid: string; skipped: boolean }[],
prometheusRegistry: any,
logger: Logger
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
): Promise<
{
legacyPath: string
Expand Down Expand Up @@ -523,11 +530,37 @@ async function _copyLegacyFiles(
}

// Record results in Prometheus metric and log errors
const metric = prometheusRegistry.getMetric(
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_LEGACY_PATH_GAUGE
)
metric.inc({ result: 'success' }, copiedPaths.length)
metric.inc({ result: 'failure' }, erroredPaths.length)
// Record results in Prometheus metric
if (copiedPaths.length > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_LEGACY_PATH_GAUGE,
metricValue: copiedPaths.length,
metricLabels: { result: 'success' }
},
prometheusRegistry,
logger
)
}
if (Object.keys(erroredPaths).length > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_LEGACY_PATH_GAUGE,
metricValue: Object.keys(erroredPaths).length,
metricLabels: { result: 'failure' }
},
prometheusRegistry,
logger
)
}
if (!isEmpty(erroredPaths)) {
logger.debug(
`Failed to copy some legacy files: ${JSON.stringify(erroredPaths)}`
Expand Down Expand Up @@ -567,7 +600,13 @@ const _migrateNonDirFilesWithLegacyStoragePaths = async (
queryDelayMs: number,
batchSize: number,
prometheusRegistry: any,
logger: Logger
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
) =>
_callFuncOnAllCidsPaginated(async (minCid, maxCid) => {
// Query for legacy storagePaths in the pagination range until no new results are returned
Expand All @@ -585,7 +624,9 @@ const _migrateNonDirFilesWithLegacyStoragePaths = async (
const copiedFilePaths = await _copyLegacyFiles(
results,
prometheusRegistry,
logger
logger,
validateMetricToRecord,
recordMetrics
)
await DbManager.updateLegacyPathDbRows(copiedFilePaths, logger)
} else {
Expand Down Expand Up @@ -685,7 +726,13 @@ const _migrateFilesWithCustomStoragePaths = async (
queryDelayMs: number,
batchSize: number,
prometheusRegistry: any,
logger: Logger
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
) =>
_callFuncOnAllCidsPaginated(async (minCid, maxCid) => {
// Query for custom storagePaths in the pagination range until no new results are returned
Expand Down Expand Up @@ -731,11 +778,38 @@ const _migrateFilesWithCustomStoragePaths = async (
await timeout(1000)
}
// Record results in Prometheus metric
const metric = prometheusRegistry.getMetric(
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_CUSTOM_PATH_GAUGE
)
metric.inc({ result: 'success' }, numFilesMigratedSuccessfully)
metric.inc({ result: 'failure' }, numFilesFailedToMigrate)
if (numFilesMigratedSuccessfully > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
prometheusRegistry.metricNames
.FILES_MIGRATED_FROM_CUSTOM_PATH_GAUGE,
metricValue: numFilesMigratedSuccessfully,
metricLabels: { result: 'success' }
},
prometheusRegistry,
logger
)
}
if (numFilesFailedToMigrate > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
prometheusRegistry.metricNames
.FILES_MIGRATED_FROM_CUSTOM_PATH_GAUGE,
metricValue: numFilesFailedToMigrate,
metricLabels: { result: 'failure' }
},
prometheusRegistry,
logger
)
}
} else {
newResultsFound = false
}
Expand All @@ -762,7 +836,13 @@ const _migrateFilesWithCustomStoragePaths = async (
export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs: number,
prometheusRegistry: any,
logger: Logger
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
): Promise<void> {
const BATCH_SIZE = 5_000
// Legacy storagePaths
Expand All @@ -772,7 +852,9 @@ export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs,
BATCH_SIZE,
prometheusRegistry,
logger
logger,
validateMetricToRecord,
recordMetrics
)
await _migrateDirsWithLegacyStoragePaths(queryDelayMs, BATCH_SIZE, logger)
} catch (e: any) {
Expand All @@ -787,7 +869,9 @@ export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs,
BATCH_SIZE,
prometheusRegistry,
logger
logger,
validateMetricToRecord,
recordMetrics
)
} catch (e: any) {
logger.error(`Error migrating custom storagePaths: ${e}`)
Expand All @@ -798,6 +882,8 @@ export async function migrateFilesWithNonStandardStoragePaths(
return migrateFilesWithNonStandardStoragePaths(
5000,
prometheusRegistry,
logger
logger,
validateMetricToRecord,
recordMetrics
)
}
19 changes: 18 additions & 1 deletion creator-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import cluster from 'cluster'
import ON_DEATH from 'death'
import { Keypair } from '@solana/web3.js'

import {
validateMetricToRecord,
recordMetrics
} from './services/prometheusMonitoring/prometheusUsageUtils'
import { initializeApp } from './app'
import config from './config'
import { serviceRegistry } from './serviceRegistry'
Expand Down Expand Up @@ -134,7 +138,9 @@ const runAsyncBackgroundTasks = async () => {
migrateFilesWithNonStandardStoragePaths(
500,
serviceRegistry.prometheusRegistry,
logger
logger,
validateMetricToRecord,
recordMetrics
)
}

Expand Down Expand Up @@ -251,6 +257,17 @@ const startAppForWorker = async () => {
`Failed to send aggregated metrics data back to worker: ${error}`
)
}
} else if (msg?.cmd === 'recordMetric') {
try {
// The primary can't record prometheus metrics in cluster mode, so we record a metric that the primary sent to this worker if it's the special worker
if (clusterUtilsForWorker.isThisWorkerSpecial()) {
recordMetrics(serviceRegistry.prometheusRegistry, logger, [msg.val])
}
} catch (error: any) {
logger.error(
`Primary requested worker to record a metric, and the worker failed to record it: ${error}`
)
}
}
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Gauge, Histogram } from 'prom-client'
import { snakeCase, mapValues } from 'lodash'
// eslint-disable-next-line import/no-unresolved
import { exponentialBucketsRange } from './prometheusUtils'
import { exponentialBucketsRange } from './prometheusSetupUtils'
import {
QUEUE_NAMES as STATE_MACHINE_JOB_NAMES,
SyncType,
Expand Down

0 comments on commit 258cb5e

Please sign in to comment.