From 77b68e0e1739f4ecb9d90eb7e5985c5ddee51bfd Mon Sep 17 00:00:00 2001 From: ggao Date: Wed, 24 Sep 2025 11:34:04 -0700 Subject: [PATCH] Update worker failure metric --- .../mantisrx/server/worker/RunningWorker.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/RunningWorker.java b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/RunningWorker.java index fcd2b9ae2..755c4d6e5 100644 --- a/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/RunningWorker.java +++ b/mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/RunningWorker.java @@ -18,9 +18,7 @@ import static io.mantisrx.server.core.utils.StatusConstants.STATUS_MESSAGE_FORMAT; -import com.netflix.spectator.api.Tag; -import io.mantisrx.common.metrics.Metrics; -import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; import io.mantisrx.runtime.Context; import io.mantisrx.runtime.Job; import io.mantisrx.runtime.MantisJobState; @@ -160,14 +158,13 @@ public void signalFailed(Throwable t) { logger.error("Worker failure detected, shutting down job: {}", jobId, t); // Send failure metrics when data emission failed if (t instanceof OnErrorThrowable) { - Metrics jobFailureMetrics = new Metrics.Builder() - .id(workerMonitorMetricId, Tag.of("jobId", this.jobId), - Tag.of("workerIndex", String.valueOf(this.workerIndex)), - Tag.of("stageNum", String.valueOf(this.stageNum))) - .addCounter(workerFailureMetricName) - .build(); - - MetricsRegistry.getInstance().registerAndGet(jobFailureMetrics).getCounter(workerFailureMetricName).increment(); + SpectatorRegistryFactory.getRegistry() + .counter("runningWorker_failure", + "jobId", jobId, + "workerIndex", String.valueOf(this.workerIndex), + "stageNum", String.valueOf(this.stageNum) + ) + .increment(); } jobStatus.onNext(new Status(jobId, stageNum, workerIndex, workerNum,