From 96ad079354eba2c68a4e9854e14c584043e49e36 Mon Sep 17 00:00:00 2001 From: Ethan Li Date: Wed, 2 Aug 2017 09:22:33 -0500 Subject: [PATCH] [STORM-2672] Expose a metric for calls to reportError() --- docs/Metrics.md | 8 ++++ .../daemon/metrics/ErrorReportingMetrics.java | 38 +++++++++++++++++++ .../org/apache/storm/executor/Executor.java | 9 +++++ .../storm/executor/bolt/BoltExecutor.java | 2 + .../bolt/BoltOutputCollectorImpl.java | 1 + .../storm/executor/spout/SpoutExecutor.java | 1 + .../spout/SpoutOutputCollectorImpl.java | 1 + 7 files changed, 60 insertions(+) create mode 100644 storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java diff --git a/docs/Metrics.md b/docs/Metrics.md index b0235b23a5..cf42339460 100644 --- a/docs/Metrics.md +++ b/docs/Metrics.md @@ -206,6 +206,14 @@ This metric records how much time a spout was idle because back-pressure indicat This metric records how much time a spout was idle because the topology was deactivated. This is the total time in milliseconds, not the average amount of time and is not sub-sampled. +#### Error Reporting Metrics + +Storm also collects error reporting metrics for bolts and spouts. + +##### `__reported-error-count` + +This metric records how many errors were reported by a spout/bolt. It is the total number of times the `reportError` method was called. + #### Queue Metrics Each bolt or spout instance in a topology has a receive queue and a send queue. Each worker also has a queue for sending messages to other workers. All of these have metrics that are reported. diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java new file mode 100644 index 0000000000..3b9e4b64d3 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.metrics; + +import org.apache.storm.metric.api.CountMetric; + +public class ErrorReportingMetrics extends BuiltinMetrics { + private final CountMetric reportedErrorCount = new CountMetric(); + + public ErrorReportingMetrics() { + metricMap.put("reported-error-count", reportedErrorCount); + } + + public void incrReportedErrorCountBy(long n) { + this.reportedErrorCount.incrBy(n); + } + + public void incrReportedErrorCount() { + this.reportedErrorCount.incr(); + } + +} diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index c97725e336..c1c63503d5 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -41,6 +41,7 @@ import org.apache.storm.daemon.GrouperFactory; import org.apache.storm.daemon.StormCommon; import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.ErrorReportingMetrics; import org.apache.storm.daemon.worker.WorkerState; import org.apache.storm.executor.bolt.BoltExecutor; import org.apache.storm.executor.error.IReportError; @@ -117,6 +118,8 @@ public abstract class Executor implements Callable, EventHandler { protected final Boolean hasEventLoggers; protected String hostname; + protected final ErrorReportingMetrics errorReportingMetrics; + protected Executor(WorkerState workerData, List executorId, Map credentials) { this.workerData = workerData; this.executorId = executorId; @@ -173,6 +176,8 @@ protected Executor(WorkerState workerData, List executorId, Map executorId, Map credentials) { @@ -514,6 +519,10 @@ public IReportError getReportError() { return reportError; } + public ErrorReportingMetrics getErrorReportingMetrics() { + return errorReportingMetrics; + } + public WorkerTopologyContext getWorkerTopologyContext() { return workerTopologyContext; } diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java index 20bf7e1d17..5d9edf198f 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java @@ -58,6 +58,8 @@ public void init(Map idToTask) { Utils.sleep(100); } + this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext()); + LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet()); for (Map.Entry entry : idToTask.entrySet()) { Task taskData = entry.getValue(); diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java index 46b945b224..696447e35e 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java @@ -151,6 +151,7 @@ public void resetTimeout(Tuple input) { @Override public void reportError(Throwable error) { + executor.getErrorReportingMetrics().incrReportedErrorCount(); executor.getReportError().report(error); } diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index 3a03cf7b5b..6e85f34eb2 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -106,6 +106,7 @@ public void expire(Long key, TupleInfo tupleInfo) { }); this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext()); + this.errorReportingMetrics.registerAll(topoConf, idToTask.values().iterator().next().getUserContext()); this.outputCollectors = new ArrayList<>(); for (Map.Entry entry : idToTask.entrySet()) { Task taskData = entry.getValue(); diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java index f81b2c2d1d..1d78d571ff 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java @@ -77,6 +77,7 @@ public long getPendingCount() { @Override public void reportError(Throwable error) { + executor.getErrorReportingMetrics().incrReportedErrorCount(); executor.getReportError().report(error); }