Skip to content

Commit

Permalink
Clean up storagePath metrics and expose failed paths in non-debug logs (
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie committed Dec 14, 2022
1 parent 258cb5e commit 31c3ed8
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 86 deletions.
2 changes: 1 addition & 1 deletion creator-node/scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ if [ "${ARG1}" == "standalone_creator" ]; then
fi
if [ ! "${REDIS_EXISTS}" ]; then
echo "Redis Container doesn't exist"
docker run -d --name $REDIS_CONTAINER -p 127.0.0.1:$redisPort:6379 redis:5.0.4
docker run -d --name $REDIS_CONTAINER -p 127.0.0.1:$redisPort:6379 redis:6.2.7
sleep 1
fi
elif [ "${ARG1}" == "teardown" ]; then
Expand Down
138 changes: 77 additions & 61 deletions creator-node/src/diskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,7 @@ async function _touch(path: string) {
async function _copyLegacyFiles(
legacyPathsAndCids: { storagePath: string; cid: string; skipped: boolean }[],
prometheusRegistry: any,
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
logger: Logger
): Promise<
{
legacyPath: string
Expand Down Expand Up @@ -530,11 +524,8 @@ async function _copyLegacyFiles(
}

// Record results in Prometheus metric and log errors
// Record results in Prometheus metric
if (copiedPaths.length > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
Expand All @@ -548,8 +539,6 @@ async function _copyLegacyFiles(
}
if (Object.keys(erroredPaths).length > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
Expand All @@ -562,7 +551,7 @@ async function _copyLegacyFiles(
)
}
if (!isEmpty(erroredPaths)) {
logger.debug(
logger.warn(
`Failed to copy some legacy files: ${JSON.stringify(erroredPaths)}`
)
}
Expand Down Expand Up @@ -600,13 +589,7 @@ const _migrateNonDirFilesWithLegacyStoragePaths = async (
queryDelayMs: number,
batchSize: number,
prometheusRegistry: any,
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
logger: Logger
) =>
_callFuncOnAllCidsPaginated(async (minCid, maxCid) => {
// Query for legacy storagePaths in the pagination range until no new results are returned
Expand All @@ -624,9 +607,7 @@ const _migrateNonDirFilesWithLegacyStoragePaths = async (
const copiedFilePaths = await _copyLegacyFiles(
results,
prometheusRegistry,
logger,
validateMetricToRecord,
recordMetrics
logger
)
await DbManager.updateLegacyPathDbRows(copiedFilePaths, logger)
} else {
Expand Down Expand Up @@ -679,7 +660,7 @@ async function _migrateFileWithCustomStoragePath(
trackBlockchainId?: number
},
logger: Logger
) {
): Promise<{ success: boolean; error: any }> {
const fetchStartTime = getStartTime()
// Will retry internally
const { error, storagePath } = await fetchFileFromNetworkAndSaveToFS(
Expand All @@ -703,13 +684,13 @@ async function _migrateFileWithCustomStoragePath(
where: { fileUUID: fileRecord.fileUUID }
}
)
return true
return { success: true, error: '' }
}
logErrorWithDuration(
{ logger, startTime: fetchStartTime },
`Error fixing fileRecord ${JSON.stringify(fileRecord)}: ${error}`
)
return false
return { success: false, error }
} else {
// Update file's storagePath in DB to newly saved location, and ensure it's not marked as skipped
await models.File.update(
Expand All @@ -718,21 +699,15 @@ async function _migrateFileWithCustomStoragePath(
where: { fileUUID: fileRecord.fileUUID, skipped: false }
}
)
return true
return { success: true, error: '' }
}
}

const _migrateFilesWithCustomStoragePaths = async (
queryDelayMs: number,
batchSize: number,
prometheusRegistry: any,
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
logger: Logger
) =>
_callFuncOnAllCidsPaginated(async (minCid, maxCid) => {
// Query for custom storagePaths in the pagination range until no new results are returned
Expand All @@ -759,29 +734,31 @@ const _migrateFilesWithCustomStoragePaths = async (
let numFilesMigratedSuccessfully = 0
let numFilesFailedToMigrate = 0
for (const fileRecord of results) {
let success
let success, error
try {
success = await _migrateFileWithCustomStoragePath(
;({ success, error } = await _migrateFileWithCustomStoragePath(
fileRecord,
logger
)
))
} catch (e: any) {
success = false
error = e
}
if (success) {
numFilesMigratedSuccessfully++
} else {
numFilesFailedToMigrate++
logger.error(
`Error fixing fileRecord ${JSON.stringify(fileRecord)}: ${e}`
`Error fixing fileRecord ${JSON.stringify(fileRecord)}: ${error}`
)
numFilesFailedToMigrate++
}
if (success) numFilesMigratedSuccessfully++
else numFilesFailedToMigrate++

// Add delay between calls since each call will make an internal request to every node
await timeout(1000)
}
// Record results in Prometheus metric
if (numFilesMigratedSuccessfully > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
Expand All @@ -796,8 +773,6 @@ const _migrateFilesWithCustomStoragePaths = async (
}
if (numFilesFailedToMigrate > 0) {
clusterUtilsForPrimary.sendMetricToWorker(
validateMetricToRecord,
recordMetrics,
{
metricType: 'GAUGE_INC',
metricName:
Expand All @@ -817,6 +792,56 @@ const _migrateFilesWithCustomStoragePaths = async (
}
})

function _resetStoragePathMetrics(prometheusRegistry: any, logger: Logger) {
// Reset metric for legacy migrations
clusterUtilsForPrimary.sendMetricToWorker(
{
metricType: 'GAUGE_SET',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_LEGACY_PATH_GAUGE,
metricValue: 0,
metricLabels: { result: 'success' }
},
prometheusRegistry,
logger
)
clusterUtilsForPrimary.sendMetricToWorker(
{
metricType: 'GAUGE_SET',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_LEGACY_PATH_GAUGE,
metricValue: 0,
metricLabels: { result: 'failure' }
},
prometheusRegistry,
logger
)

// Reset metric for custom migrations
clusterUtilsForPrimary.sendMetricToWorker(
{
metricType: 'GAUGE_SET',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_CUSTOM_PATH_GAUGE,
metricValue: 0,
metricLabels: { result: 'success' }
},
prometheusRegistry,
logger
)
clusterUtilsForPrimary.sendMetricToWorker(
{
metricType: 'GAUGE_SET',
metricName:
prometheusRegistry.metricNames.FILES_MIGRATED_FROM_CUSTOM_PATH_GAUGE,
metricValue: 0,
metricLabels: { result: 'failure' }
},
prometheusRegistry,
logger
)
}

/**
* For non-directory files and then later for files, this:
* 1. Finds rows in the Files table that have a legacy storagePath (/file_storage/<CID or dirCID>)
Expand All @@ -836,14 +861,11 @@ const _migrateFilesWithCustomStoragePaths = async (
export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs: number,
prometheusRegistry: any,
logger: Logger,
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void
logger: Logger
): Promise<void> {
// Reset gauges on each run so the metrics aren't infinitely increasing
_resetStoragePathMetrics(prometheusRegistry, logger)

const BATCH_SIZE = 5_000
// Legacy storagePaths
if (config.get('migrateFilesWithLegacyStoragePath')) {
Expand All @@ -852,9 +874,7 @@ export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs,
BATCH_SIZE,
prometheusRegistry,
logger,
validateMetricToRecord,
recordMetrics
logger
)
await _migrateDirsWithLegacyStoragePaths(queryDelayMs, BATCH_SIZE, logger)
} catch (e: any) {
Expand All @@ -869,9 +889,7 @@ export async function migrateFilesWithNonStandardStoragePaths(
queryDelayMs,
BATCH_SIZE,
prometheusRegistry,
logger,
validateMetricToRecord,
recordMetrics
logger
)
} catch (e: any) {
logger.error(`Error migrating custom storagePaths: ${e}`)
Expand All @@ -882,8 +900,6 @@ export async function migrateFilesWithNonStandardStoragePaths(
return migrateFilesWithNonStandardStoragePaths(
5000,
prometheusRegistry,
logger,
validateMetricToRecord,
recordMetrics
logger
)
}
9 changes: 2 additions & 7 deletions creator-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import cluster from 'cluster'
import ON_DEATH from 'death'
import { Keypair } from '@solana/web3.js'

import {
validateMetricToRecord,
recordMetrics
} from './services/prometheusMonitoring/prometheusUsageUtils'
import { recordMetrics } from './services/prometheusMonitoring/prometheusUsageUtils'
import { initializeApp } from './app'
import config from './config'
import { serviceRegistry } from './serviceRegistry'
Expand Down Expand Up @@ -138,9 +135,7 @@ const runAsyncBackgroundTasks = async () => {
migrateFilesWithNonStandardStoragePaths(
500,
serviceRegistry.prometheusRegistry,
logger,
validateMetricToRecord,
recordMetrics
logger
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import type Logger from 'bunyan'
import type { MetricToRecord } from './types'

const {
METRIC_RECORD_TYPE,
METRIC_NAMES,
METRIC_LABELS
} = require('./prometheus.constants')

export type MetricToRecord = {
metricName: string
metricType: string
metricValue: number
metricLabels: Record<string, string>
}

export const recordMetrics = (
prometheusRegistry: any,
logger: Logger,
Expand Down Expand Up @@ -164,6 +158,7 @@ export const validateMetricToRecord = ({
}

module.exports = {
validateMetricToRecord,
recordMetrics,
makeHistogramToRecord,
makeGaugeSetToRecord,
Expand Down
6 changes: 6 additions & 0 deletions creator-node/src/services/prometheusMonitoring/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export type MetricToRecord = {
metricName: string
metricType: string
metricValue: number
metricLabels: Record<string, string>
}
30 changes: 21 additions & 9 deletions creator-node/src/utils/cluster/clusterUtilsForPrimary.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import type { Cluster } from 'cluster'
import type Logger from 'bunyan'
import type { MetricToRecord } from '../../services/prometheusMonitoring/types'

import { isClusterEnabled } from './clusterUtils'
const cluster: Cluster = require('cluster')

// Lazy-load prometheus utils to avoid DiskManager tests throwing errors when loading it too soon doens't allow everything to be mocked properly
let promUtils: () => {
validateMetricToRecord: (metric: MetricToRecord) => MetricToRecord
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: MetricToRecord[]
) => void
} = () => {
const data = require('../../services/prometheusMonitoring/prometheusUsageUtils')
promUtils = () => data
return data
}

/**
* Cluster utils that are only needed by the primary process (not worker processes).
*/
Expand Down Expand Up @@ -31,20 +46,17 @@ class ClusterUtilsForPrimary {
* The primary can't record prometheus metrics in cluster mode, so this sends a metric to a worker to record it.
*/
sendMetricToWorker(
validateMetricToRecord: (metric: any) => any,
recordMetrics: (
prometheusRegistry: any,
logger: Logger,
metrics: any[]
) => void,
metricToRecord: any,
metricToRecord: MetricToRecord,
prometheusRegistry: any,
logger: Logger
) {
const validatedMetricToRecord = validateMetricToRecord(metricToRecord)
const validatedMetricToRecord =
promUtils().validateMetricToRecord(metricToRecord)
// Non-cluster mode can just record the metric now
if (!isClusterEnabled()) {
recordMetrics(prometheusRegistry, logger, [validatedMetricToRecord])
promUtils().recordMetrics(prometheusRegistry, logger, [
validatedMetricToRecord
])
return
}

Expand Down

0 comments on commit 31c3ed8

Please sign in to comment.