From 21da5dff6c6b12f8350096969a7082f68e2f802c Mon Sep 17 00:00:00 2001 From: Aaron Gresch Date: Thu, 5 Apr 2018 14:42:03 -0500 Subject: [PATCH] STORM-2909 port new metrics to 2.x branch --- conf/defaults.yaml | 23 +++ .../apache/storm/perf/JCQueuePerfTest.java | 18 +- .../storm/starter/AnchoredWordCount.java | 139 ++++++++++++++ pom.xml | 10 + storm-client/pom.xml | 13 ++ .../src/jvm/org/apache/storm/Config.java | 102 ++++++---- .../src/jvm/org/apache/storm/daemon/Task.java | 20 +- .../daemon/metrics/ClientMetricsUtils.java | 52 +++++ .../daemon/worker/BackPressureTracker.java | 12 +- .../apache/storm/daemon/worker/Worker.java | 18 +- .../storm/daemon/worker/WorkerState.java | 29 +-- .../storm/daemon/worker/WorkerTransfer.java | 15 +- .../org/apache/storm/executor/Executor.java | 19 +- .../bolt/BoltOutputCollectorImpl.java | 7 +- .../storm/executor/spout/SpoutExecutor.java | 43 +++-- .../org/apache/storm/metrics2/JcMetrics.java | 45 +++++ .../apache/storm/metrics2/SimpleGauge.java | 38 ++++ .../storm/metrics2/StormMetricRegistry.java | 180 ++++++++++++++++++ .../apache/storm/metrics2/TaskMetrics.java | 85 +++++++++ .../storm/metrics2/filters/RegexFilter.java | 48 +++++ .../metrics2/filters/StormMetricsFilter.java | 33 ++++ .../reporters/ConsoleStormReporter.java | 69 +++++++ .../metrics2/reporters/CsvStormReporter.java | 97 ++++++++++ .../reporters/GangliaStormReporter.java | 132 +++++++++++++ .../reporters/GraphiteStormReporter.java | 102 ++++++++++ .../metrics2/reporters/JmxStormReporter.java | 92 +++++++++ .../reporters/ScheduledStormReporter.java | 86 +++++++++ .../metrics2/reporters/StormReporter.java | 35 ++++ .../apache/storm/stats/BoltExecutorStats.java | 32 +--- .../org/apache/storm/stats/CommonStats.java | 23 ++- .../storm/stats/SpoutExecutorStats.java | 26 +-- .../apache/storm/task/TopologyContext.java | 74 +++++-- .../jvm/org/apache/storm/utils/JCQueue.java | 58 ++++-- .../storm/validation/ConfigValidation.java | 167 ++++++++++------ .../storm/utils/JCQueueBackpressureTest.java | 2 +- .../org/apache/storm/utils/JCQueueTest.java | 4 +- .../java/org/apache/storm/DaemonConfig.java | 41 ++-- .../storm/daemon/metrics/MetricsUtils.java | 39 +--- .../reporters/ConsolePreparableReporter.java | 16 +- .../reporters/CsvPreparableReporter.java | 17 +- .../reporters/JmxPreparableReporter.java | 12 +- .../nimbus/DefaultTopologyValidator.java | 38 +++- .../storm/nimbus/StrictTopologyValidator.java | 67 +++++++ 43 files changed, 1830 insertions(+), 348 deletions(-) create mode 100644 examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java create mode 100644 storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java create mode 100644 storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java create mode 100644 storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 8dcb2d97041..c985c12fc51 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -388,3 +388,26 @@ worker.metrics: # The number of buckets for running statistics num.stat.buckets: 20 + +# Metrics v2 configuration (optional) +#storm.metrics.reporters: +# # Graphite Reporter +# - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" +# daemons: +# - "supervisor" +# - "nimbus" +# - "worker" +# report.period: 60 +# report.period.units: "SECONDS" +# graphite.host: "localhost" +# graphite.port: 2003 +# +# # Console Reporter +# - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" +# daemons: +# - "worker" +# report.period: 10 +# report.period.units: "SECONDS" +# filter: +# class: "org.apache.storm.metrics2.filters.RegexFilter" +# expression: ".*my_component.*emitted.*" diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java index 17c676fa76e..700ce4ecffe 100644 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/JCQueuePerfTest.java @@ -46,8 +46,8 @@ public static void main(String[] args) throws Exception { private static void ackingProducerSimulation() { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws); - JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws); + JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test",1000, 1000); + JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test",1000, 1000); final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ); final Acker acker = new Acker(ackQ, spoutQ); @@ -57,8 +57,8 @@ private static void ackingProducerSimulation() { private static void producerFwdConsumer(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); final Producer prod = new Producer(q1); final Forwarder fwd = new Forwarder(q1, q2); @@ -69,7 +69,7 @@ private static void producerFwdConsumer(int prodBatchSz) { private static void oneProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Consumer cons1 = new Consumer(q1); @@ -78,7 +78,7 @@ private static void oneProducer1Consumer(int prodBatchSz) { } private static void twoProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -88,7 +88,7 @@ private static void twoProducer1Consumer(int prodBatchSz) { } private static void threeProducer1Consumer(int prodBatchSz) { - JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100)); + JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",1000, 1000); final Producer prod1 = new Producer(q1); final Producer prod2 = new Producer(q1); @@ -101,8 +101,8 @@ private static void threeProducer1Consumer(int prodBatchSz) { private static void oneProducer2Consumers(int prodBatchSz) { WaitStrategyPark ws = new WaitStrategyPark(100); - JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws); - JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws); + JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); + JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test",1000, 1000); final Producer2 prod1 = new Producer2(q1, q2); final Consumer cons1 = new Consumer(q1); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java new file mode 100644 index 00000000000..cb45024e66e --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/AnchoredWordCount.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.starter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.ConfigurableTopology; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +public class AnchoredWordCount extends ConfigurableTopology { + + public static class RandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector collector; + Random random; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + this.random = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(10); + String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), + sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + final String sentence = sentences[random.nextInt(sentences.length)]; + + this.collector.emit(new Values(sentence), UUID.randomUUID()); + } + + protected String sentence(String input) { + return input; + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map counts = new HashMap<>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) { + count = 0; + } + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + protected int run(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setMaxTaskParallelism(3); + + String topologyName = "word-count"; + + conf.setNumWorkers(3); + + if (args != null && args.length > 0) { + topologyName = args[0]; + } + return submit(topologyName, conf, builder); + } +} diff --git a/pom.xml b/pom.xml index d3204b2c8e2..77c77381d2b 100644 --- a/pom.xml +++ b/pom.xml @@ -955,6 +955,16 @@ metrics-core ${metrics.version} + + io.dropwizard.metrics + metrics-ganglia + ${metrics.version} + + + io.dropwizard.metrics + metrics-graphite + ${metrics.version} + metrics-clojure metrics-clojure diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 2e073a86ded..1311d2a8e13 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -184,6 +184,19 @@ curator-client + + io.dropwizard.metrics + metrics-core + + + io.dropwizard.metrics + metrics-ganglia + + + io.dropwizard.metrics + metrics-graphite + + diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index b2759854f52..3ebc49314e8 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -18,20 +18,19 @@ package org.apache.storm; +import com.esotericsoftware.kryo.Serializer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.storm.metric.IEventLogger; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; import org.apache.storm.validation.ConfigValidation; -import org.apache.storm.validation.ConfigValidationAnnotations.*; import org.apache.storm.validation.ConfigValidation.*; -import com.esotericsoftware.kryo.Serializer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.storm.validation.ConfigValidationAnnotations.*; /** * Topology configs are specified as a plain old map. This class provides a @@ -148,7 +147,7 @@ public class Config extends HashMap { * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer */ @isStringList - public static final String TOPOLOGY_READONLY_USERS="topology.readonly.users"; + public static final String TOPOLOGY_READONLY_USERS = "topology.readonly.users"; /** * A list of readonly groups that are allowed to interact with the topology. To use this set @@ -171,7 +170,7 @@ public class Config extends HashMap { public static final String TOPOLOGY_DEBUG = "topology.debug"; /** - * User defined version of this topology + * User defined version of this topology. */ @isString public static final String TOPOLOGY_VERSION = "topology.version"; @@ -186,7 +185,7 @@ public class Config extends HashMap { /** * The serializer for communication between shell components and non-JVM - * processes + * processes. */ @isString public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer"; @@ -268,14 +267,16 @@ public class Config extends HashMap { * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override */ @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = "topology.metrics.consumer.resources.onheap.memory.mb"; + public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = + "topology.metrics.consumer.resources.onheap.memory.mb"; /** * The maximum amount of memory an instance of a metrics consumer will take off heap. This enables the scheduler * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override */ @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = "topology.metrics.consumer.resources.offheap.memory.mb"; + public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = + "topology.metrics.consumer.resources.offheap.memory.mb"; /** * The config indicates the percentage of cpu for a core an instance(executor) of a metrics consumer will use. @@ -311,13 +312,13 @@ public class Config extends HashMap { public static final String TOPOLOGY_STATE_CHECKPOINT_INTERVAL = "topology.state.checkpoint.interval.ms"; /** - * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology + * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology. */ @isPositiveNumber public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb"; /** - * The strategy to use when scheduling a topology with Resource Aware Scheduler + * The strategy to use when scheduling a topology with Resource Aware Scheduler. */ @NotNull @isString @@ -388,7 +389,7 @@ public class Config extends HashMap { * Note that EventLoggerBolt takes care of all the implementations of IEventLogger, hence registering * many implementations (especially they're implemented as 'blocking' manner) would slow down overall topology. */ - @isListEntryCustom(entryValidatorClasses={EventLoggerRegistryValidator.class}) + @isListEntryCustom(entryValidatorClasses = {EventLoggerRegistryValidator.class}) public static final String TOPOLOGY_EVENT_LOGGER_REGISTER = "topology.event.logger.register"; /** @@ -452,10 +453,10 @@ public class Config extends HashMap { * rather than throw an error. */ @isBoolean - public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = "topology.skip.missing.kryo.registrations"; /** - * List of classes to register during state serialization + * List of classes to register during state serialization. */ @isStringList public static final String TOPOLOGY_STATE_KRYO_REGISTER = "topology.state.kryo.register"; @@ -466,7 +467,7 @@ public class Config extends HashMap { * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. */ - @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class}) + @isListEntryCustom(entryValidatorClasses = {MetricRegistryValidator.class}) public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; /** @@ -477,13 +478,13 @@ public class Config extends HashMap { public static final String TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS = "topology.serialized.message.size.metrics"; /** - * A map of metric name to class name implementing IMetric that will be created once per worker JVM + * A map of metric name to class name implementing IMetric that will be created once per worker JVM. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String TOPOLOGY_WORKER_METRICS = "topology.worker.metrics"; /** - * A map of metric name to class name implementing IMetric that will be created once per worker JVM + * A map of metric name to class name implementing IMetric that will be created once per worker JVM. */ @isMapEntryType(keyType = String.class, valueType = String.class) public static final String WORKER_METRICS = "worker.metrics"; @@ -494,7 +495,7 @@ public class Config extends HashMap { */ @isInteger @isPositiveNumber - public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism"; + public static final String TOPOLOGY_MAX_TASK_PARALLELISM = "topology.max.task.parallelism"; /** * The maximum number of tuples that can be pending on a spout task at any given time. @@ -506,14 +507,14 @@ public class Config extends HashMap { */ @isInteger @isPositiveNumber - public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; + public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending"; /** * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for. */ @isInteger @isPositiveNumber(includeZero = true) - public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms"; + public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = "topology.sleep.spout.wait.strategy.time.ms"; /** * The maximum amount of time a component gives a source of state to synchronize before it requests @@ -522,49 +523,49 @@ public class Config extends HashMap { @isInteger @isPositiveNumber @NotNull - public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs"; + public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = "topology.state.synchronization.timeout.secs"; /** * The percentage of tuples to sample to produce stats for a task. */ @isPositiveNumber - public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate"; + public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate"; /** * The time period that builtin metrics data in bucketed into. */ @isInteger - public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs"; + public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = "topology.builtin.metrics.bucket.size.secs"; /** * Whether or not to use Java serialization in a topology. */ @isBoolean - public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization"; + public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = "topology.fall.back.on.java.serialization"; /** * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts"; + public static final String TOPOLOGY_WORKER_CHILDOPTS = "topology.worker.childopts"; /** * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts"; + public static final String TOPOLOGY_WORKER_GC_CHILDOPTS = "topology.worker.gc.childopts"; /** * Topology-specific options for the logwriter process of a worker. */ @isStringOrStringList - public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS="topology.worker.logwriter.childopts"; + public static final String TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS = "topology.worker.logwriter.childopts"; /** * Topology-specific classpath for the worker child process. This is combined to the usual classpath. */ @isStringOrStringList - public static final String TOPOLOGY_CLASSPATH="topology.classpath"; + public static final String TOPOLOGY_CLASSPATH = "topology.classpath"; /** * Topology-specific classpath for the worker child process. This will be *prepended* to @@ -573,14 +574,14 @@ public class Config extends HashMap { * classpaths, set the storm.topology.classpath.beginning.enabled config to true. */ @isStringOrStringList - public static final String TOPOLOGY_CLASSPATH_BEGINNING="topology.classpath.beginning"; + public static final String TOPOLOGY_CLASSPATH_BEGINNING = "topology.classpath.beginning"; /** * Topology-specific environment variables for the worker child process. * This is added to the existing environment (that of the supervisor) */ @isMapEntryType(keyType = String.class, valueType = String.class) - public static final String TOPOLOGY_ENVIRONMENT="topology.environment"; + public static final String TOPOLOGY_ENVIRONMENT = "topology.environment"; /* * Bolt-specific configuration for windowed bolts to specify the window length as a count of number of tuples @@ -651,7 +652,7 @@ public class Config extends HashMap { * topology in Zookeeper. */ @isString - public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id"; + public static final String TOPOLOGY_TRANSACTIONAL_ID = "topology.transactional.id"; /** * A list of task hooks that are automatically added to every spout and bolt in the topology. An example @@ -659,21 +660,21 @@ public class Config extends HashMap { * monitoring system. These hooks are instantiated using the zero-arg constructor. */ @isStringList - public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks"; + public static final String TOPOLOGY_AUTO_TASK_HOOKS = "topology.auto.task.hooks"; /** * The size of the receive queue for each executor. */ @isPositiveNumber @isInteger - public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size"; + public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE = "topology.executor.receive.buffer.size"; /** * The size of the transfer queue for each worker. */ @isPositiveNumber @isInteger - public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size"; + public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE = "topology.transfer.buffer.size"; /** * The size of the transfer queue for each worker. @@ -1240,6 +1241,9 @@ public class Config extends HashMap { @isString public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate"; + @isListEntryCustom(entryValidatorClasses={MetricReportersValidator.class}) + public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters"; + /** * What blobstore implementation the storm client should use. */ @@ -1672,7 +1676,8 @@ public class Config extends HashMap { /** * Impersonation user ACL config entries. */ - @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, valueValidatorClasses = {ConfigValidation.ImpersonationAclUserEntryValidator.class}) + @isMapEntryCustom(keyValidatorClasses = {ConfigValidation.StringValidator.class}, + valueValidatorClasses = {ConfigValidation.ImpersonationAclUserEntryValidator.class}) public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl"; /** @@ -1913,6 +1918,27 @@ public class Config extends HashMap { @isPositiveNumber public static final String WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS = "worker.blob.update.poll.interval.secs"; + /** + * A specify Locale for daemon metrics reporter plugin. + * Use the specified IETF BCP 47 language tag string for a Locale. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.locale"; + + /** + * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit"; + + /** + * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin. + */ + @isString + public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit"; + + + public static void setClasspath(Map conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java index ce9d0e466fc..4bc0e25db00 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java @@ -29,7 +29,6 @@ import java.util.Random; import java.util.function.BooleanSupplier; - import org.apache.storm.Config; import org.apache.storm.Thrift; import org.apache.storm.daemon.worker.WorkerState; @@ -46,6 +45,7 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.metrics2.TaskMetrics; import org.apache.storm.spout.ShellSpout; import org.apache.storm.stats.CommonStats; import org.apache.storm.task.ShellBolt; @@ -78,6 +78,7 @@ public class Task { private Map> streamComponentToGrouper; private HashMap> streamToGroupers; private boolean debug; + private final TaskMetrics taskMetrics; public Task(Executor executor, Integer taskId) throws IOException { this.taskId = taskId; @@ -95,6 +96,7 @@ public Task(Executor executor, Integer taskId) throws IOException { this.taskObject = mkTaskObject(); this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG); this.addTaskHooks(); + this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId); } public List getOutgoingTasks(Integer outTaskId, String stream, List values) { @@ -116,9 +118,9 @@ public List getOutgoingTasks(Integer outTaskId, String stream, List getOutgoingTasks(String stream, List values) { ArrayList groupers = streamToGroupers.get(stream); if (null != groupers) { - for (int i=0; i getOutgoingTasks(String stream, List values) { } try { if (emitSampler.getAsBoolean()) { - executorStats.emittedTuple(stream); - executorStats.transferredTuples(stream, outTasks.size()); + executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream)); + executorStats.transferredTuples(stream, outTasks.size(), this.taskMetrics.getTransferred(stream)); } } catch (Exception e) { throw new RuntimeException(e); @@ -186,6 +188,9 @@ public Object getTaskObject() { return taskObject; } + public TaskMetrics getTaskMetrics() { + return taskMetrics; + } // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument public void sendUnanchored(String stream, List values, ExecutorTransfer transfer, Queue pendingEmits) { @@ -284,7 +289,8 @@ private void addTaskHooks() { } } - private static HashMap> getGroupersPerStream(Map> streamComponentToGrouper) { + private static HashMap> getGroupersPerStream( + Map> streamComponentToGrouper) { HashMap> result = new HashMap<>(streamComponentToGrouper.size()); for (Entry> entry : streamComponentToGrouper.entrySet()) { diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java new file mode 100644 index 00000000000..3fbdb7e946d --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ClientMetricsUtils.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.metrics; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.Config; +import org.apache.storm.utils.ObjectReader; + +public class ClientMetricsUtils { + + public static TimeUnit getMetricsRateUnit(Map topoConf) { + return getTimeUnitForCofig(topoConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT); + } + + public static TimeUnit getMetricsDurationUnit(Map topoConf) { + return getTimeUnitForCofig(topoConf, Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT); + } + + public static Locale getMetricsReporterLocale(Map topoConf) { + String languageTag = ObjectReader.getString(topoConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null); + if (languageTag != null) { + return Locale.forLanguageTag(languageTag); + } + return null; + } + + private static TimeUnit getTimeUnitForCofig(Map topoConf, String configName) { + String rateUnitString = ObjectReader.getString(topoConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java index 9b31c1a4a08..6f4be8e9fed 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java @@ -19,11 +19,12 @@ package org.apache.storm.daemon.worker; import java.util.ArrayList; -import java.util.concurrent.ConcurrentHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.storm.Constants; import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.utils.JCQueue; import org.slf4j.Logger; @@ -37,7 +38,8 @@ * ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead. */ public class BackPressureTracker { - private static final JCQueue NONE = new JCQueue ("NoneQ", 2, 0, 1, null) { }; + private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null, + "none", Constants.SYSTEM_COMPONENT_ID, -1, 0) { }; static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class); @@ -47,7 +49,7 @@ public class BackPressureTracker { public BackPressureTracker(String workerId, List allLocalTasks) { this.workerId = workerId; for (Integer taskId : allLocalTasks) { - if(taskId != SYSTEM_TASK_ID) { + if (taskId != SYSTEM_TASK_ID) { tasks.put(taskId, NONE); // all tasks are considered to be not under BP initially } } @@ -58,7 +60,7 @@ private void recordNoBackPressure(Integer taskId) { } /*** - * Record BP for a task + * Record BP for a task. * This is called by transferLocalBatch() on NettyWorker thread * @return true if an update was recorded, false if taskId is already under BP */ @@ -70,7 +72,7 @@ public boolean recordBackPressure(Integer taskId, JCQueue recvQ) { public boolean refreshBpTaskList() { boolean changed = false; LOG.debug("Running Back Pressure status change check"); - for ( Entry entry : tasks.entrySet()) { + for (Entry entry : tasks.entrySet()) { if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) { recordNoBackPressure(entry.getKey()); changed = true; diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 65915d0effc..9867cb45f96 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -63,6 +63,7 @@ import org.apache.storm.generated.SupervisorWorkerHeartbeat; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IContext; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.security.auth.AuthUtils; import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.stats.StatsUtil; @@ -73,11 +74,9 @@ import org.apache.storm.utils.SupervisorClient; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; public class Worker implements Shutdownable, DaemonCommon { @@ -149,6 +148,8 @@ public void start() throws Exception { IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext); IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext); + StormMetricRegistry.start(conf, DaemonType.WORKER); + Credentials initialCredentials = stormClusterState.credentials(topologyId, null); Map initCreds = new HashMap<>(); if (initialCredentials != null) { @@ -284,7 +285,8 @@ private void setupFlushTupleTimer(final Map topologyConf, final final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE)); final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS)); if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) { - LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis); + LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", + producerBatchSize, xferBatchSize, flushIntervalMillis); return; } @@ -309,8 +311,8 @@ private void setupBackPressureCheckTimer(final Map topologyConf) return; } final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS)); - workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs - , bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus()); + workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, + bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus()); LOG.info("BackPressure status change checking will be performed every {} millis", bpCheckIntervalMs); } @@ -416,12 +418,12 @@ private void heartbeatToMasterIfLocalbeatFail(LSWorkerHeartbeat lsWorkerHeartbea //In distributed mode, send heartbeat directly to master if local supervisor goes down. SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(), lsWorkerHeartbeat.get_executors(), lsWorkerHeartbeat.get_time_secs()); - try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)){ + try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)) { client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); } catch (Exception tr1) { //If any error/exception thrown, report directly to nimbus. LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage()); - try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)){ + try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(conf)) { nimbusClient.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat); } catch (Exception tr2) { //if any error/exception thrown, just ignore. @@ -473,6 +475,8 @@ public void shutdown() { workerState.backPressureCheckTimer.close(); workerState.closeResources(); + StormMetricRegistry.stop(); + LOG.info("Trigger any worker shutdown hooks"); workerState.runWorkerShutdownHooks(); diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 8d8c62f0efa..fc940861747 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -45,7 +45,6 @@ import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.StormTimer; -import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.cluster.IStateStorage; import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.cluster.VersionedData; @@ -69,9 +68,9 @@ import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.IContext; import org.apache.storm.messaging.TransportFactory; -import org.apache.storm.security.auth.IAutoCredentials; - +import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.policy.IWaitStrategy; +import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.serialization.ITupleSerializer; import org.apache.storm.serialization.KryoTupleSerializer; import org.apache.storm.task.WorkerTopologyContext; @@ -79,10 +78,10 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.SupervisorClient; import org.apache.storm.utils.ThriftTopologyUtils; +import org.apache.storm.utils.Utils; import org.apache.storm.utils.Utils.SmartThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,7 +216,9 @@ public Map getUserSharedResources() { // local executors and localTaskIds running in this worker final Set> localExecutors; final ArrayList localTaskIds; - final Map localReceiveQueues = new HashMap<>(); // [taskId]-> JCQueue : initialized after local executors are initialized + + // [taskId]-> JCQueue : initialized after local executors are initialized + final Map localReceiveQueues = new HashMap<>(); final Map topologyConf; final StormTopology topology; @@ -418,9 +419,8 @@ public void refreshStormActive() { public void refreshStormActive(Runnable callback) { StormBase base = stormClusterState.stormBase(topologyId, callback); isTopologyActive.set( - (null != base) && - (base.get_status() == TopologyStatus.ACTIVE) && - (isWorkerActive.get())); + (null != base) + && (base.get_status() == TopologyStatus.ACTIVE) && (isWorkerActive.get())); if (null != base) { Map debugOptionsMap = new HashMap<>(base.get_component_debug()); for (DebugOptions debugOptions : debugOptionsMap.values()) { @@ -559,7 +559,8 @@ private void transferLocalBatch(ArrayList tupleBatch) { private void dropMessage(AddressedTuple tuple, JCQueue queue) { ++dropCount; queue.recordMsgDrop(); - LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", queue.getName(), queue.getOverflowCount(), dropCount, tuple); + LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", + queue.getName(), queue.getOverflowCount(), dropCount, tuple); } public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) { @@ -643,7 +644,7 @@ private List> readWorkerExecutors(IStormClusterState stormClusterStat private Assignment getLocalAssignment(Map conf, IStormClusterState stormClusterState, String topologyId) { if (!ConfigUtils.isLocalMode(conf)) { try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), - supervisorPort)){ + supervisorPort)) { Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId); return assignment; } catch (Throwable tr1) { @@ -661,15 +662,17 @@ private Map, JCQueue> mkReceiveQueueMap(Map topologyC Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT)); if (recvBatchSize > recvQueueSize / 2) { - throw new IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + recvBatchSize + - " is greater than half of " + Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize); + throw new IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + recvBatchSize + + " is greater than half of " + Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" + recvQueueSize); } IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf); Map, JCQueue> receiveQueueMap = new HashMap<>(); for (List executor : executors) { + int port = this.getPort(); receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), - recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy)); + recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy, + this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort())); } return receiveQueueMap; diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java index e57123cd13a..8923bf56504 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java @@ -18,7 +18,12 @@ package org.apache.storm.daemon.worker; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.storm.Config; +import org.apache.storm.Constants; import org.apache.storm.messaging.TaskMessage; import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.serialization.ITupleSerializer; @@ -31,11 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; - // Transfers messages destined to other workers class WorkerTransfer implements JCQueue.Consumer { static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class); @@ -53,7 +53,7 @@ public WorkerTransfer(WorkerState workerState, Map topologyConf, this.workerState = workerState; this.backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf); this.drainer = new TransferDrainer(); - this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo+1]; + this.remoteBackPressureStatus = new AtomicBoolean[maxTaskIdInTopo + 1]; for (int i = 0; i < remoteBackPressureStatus.length; i++) { remoteBackPressureStatus[i] = new AtomicBoolean(false); } @@ -65,7 +65,8 @@ public WorkerTransfer(WorkerState workerState, Map topologyConf, + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz); } - this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy); + this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy, + workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort()); } public JCQueue getTransferQueue() { diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index f811b826f5e..962e019575b 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.UnknownHostException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -33,15 +32,15 @@ import java.util.Objects; import java.util.Queue; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.Callable; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; - import org.apache.storm.Config; import org.apache.storm.Constants; +import org.apache.storm.StormTimer; import org.apache.storm.cluster.ClusterStateContext; import org.apache.storm.cluster.ClusterUtils; import org.apache.storm.cluster.DaemonType; @@ -66,11 +65,8 @@ import org.apache.storm.grouping.LoadMapping; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IMetricsConsumer; -import org.apache.storm.stats.BoltExecutorStats; import org.apache.storm.stats.CommonStats; -import org.apache.storm.stats.SpoutExecutorStats; import org.apache.storm.stats.StatsUtil; -import org.apache.storm.StormTimer; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.Fields; @@ -78,9 +74,9 @@ import org.apache.storm.tuple.Values; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import org.jctools.queues.MpscChunkedArrayQueue; import org.json.simple.JSONValue; import org.json.simple.parser.ParseException; @@ -254,7 +250,7 @@ public Queue getPendingEmits() { } /** - * separated from mkExecutor in order to replace executor transfer in executor data for testing + * separated from mkExecutor in order to replace executor transfer in executor data for testing. */ public ExecutorShutdown execute() throws Exception { LOG.info("Loading executor tasks " + componentId + ":" + executorId); @@ -400,7 +396,7 @@ public boolean publishFlushTuple() { } /** - * Returns map of stream id to component id to grouper + * Returns map of stream id to component id to grouper. */ private Map> outboundComponents( WorkerTopologyContext workerTopologyContext, String componentId, Map topoConf) { @@ -438,7 +434,8 @@ private Map> outboundComponen // ============================ getter methods ================================= // ============================================================================= - private Map normalizedComponentConf(Map topoConf, WorkerTopologyContext topologyContext, String componentId) { + private Map normalizedComponentConf( + Map topoConf, WorkerTopologyContext topologyContext, String componentId) { List keysToRemove = retrieveAllConfigKeys(); keysToRemove.remove(Config.TOPOLOGY_DEBUG); keysToRemove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING); @@ -503,7 +500,7 @@ public String getStormId() { return stormId; } - abstract public CommonStats getStats(); + public abstract CommonStats getStats(); public String getType() { return type; diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java index facbb7523b7..f522ec652b7 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java @@ -114,7 +114,8 @@ private List boltEmit(String streamId, Collection anchors, List< } else { msgId = MessageId.makeUnanchored(); } - TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); + TupleImpl tupleExt = new TupleImpl( + executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId); xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits()); } if (isEventLoggers) { @@ -145,7 +146,7 @@ public void ack(Tuple input) { boltAckInfo.applyOn(task.getUserContext()); } if (delta >= 0) { - executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); + executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getAcked(input.getSourceStreamId())); } } @@ -166,7 +167,7 @@ public void fail(Tuple input) { BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); boltFailInfo.applyOn(task.getUserContext()); if (delta >= 0) { - executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta); + executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getFailed(input.getSourceStreamId())); } } diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index e20415042b2..5f16ccbd7e1 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -15,11 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.executor.spout; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.storm.Config; import org.apache.storm.Constants; @@ -46,20 +51,15 @@ import org.apache.storm.tuple.TupleImpl; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.JCQueue; -import org.apache.storm.utils.Utils; import org.apache.storm.utils.MutableLong; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ReflectionUtils; import org.apache.storm.utils.RotatingMap; import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - public class SpoutExecutor extends Executor { private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); @@ -92,7 +92,8 @@ public SpoutExecutor(final WorkerState workerData, final List executorId, this.emittedCount = new MutableLong(0); this.emptyEmitStreak = new MutableLong(0); this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); - this.stats = new SpoutExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS))); + this.stats = new SpoutExecutorStats( + ConfigUtils.samplingRate(this.getTopoConf()),ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS))); this.builtInMetrics = new BuiltinSpoutMetrics(stats); } @@ -108,13 +109,14 @@ public void init(final ArrayList idToTask, int idToTaskBase) { Utils.sleep(100); } - LOG.info("Opening spout {}:{}", componentId, taskIds ); + LOG.info("Opening spout {}:{}", componentId, taskIds); this.idToTask = idToTask; this.maxSpoutPending = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); this.spouts = new ArrayList<>(); for (Task task : idToTask) { - if(task!=null) + if (task != null) { this.spouts.add((ISpout) task.getTaskObject()); + } } this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback() { @Override @@ -130,9 +132,9 @@ public void expire(Long key, TupleInfo tupleInfo) { this.spoutThrottlingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext()); this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext()); this.outputCollectors = new ArrayList<>(); - for (int i=0; i 0) + if (rmspCount > 0) { LOG.debug("Ended max spout pending stretch of {} iterations", rmspCount); + } rmspCount = 0; } @@ -216,7 +220,7 @@ public Long call() throws Exception { // continue without idling return 0L; } - if ( !pendingEmits.isEmpty() ) { // then facing backpressure + if (!pendingEmits.isEmpty()) { // then facing backpressure backPressureWaitStrategy(); return 0L; } @@ -344,8 +348,9 @@ public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, T if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); } - if (hasAckers && timeDelta!=null) { - executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta); + if (hasAckers && timeDelta != null) { + executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta, + taskData.getTaskMetrics().getAcked(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); @@ -362,7 +367,8 @@ public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, spout.fail(tupleInfo.getMessageId()); new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); if (timeDelta != null) { - executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta); + executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta, + taskData.getTaskMetrics().getFailed(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); @@ -371,8 +377,9 @@ public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, public int getSpoutRecvqCheckSkipCount() { - if(ackingEnabled) + if (ackingEnabled) { return 0; // always check recQ if ACKing enabled + } return ObjectReader.getInt(conf.get(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS), 0); } diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java new file mode 100644 index 00000000000..ccd7b19eb51 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import org.apache.storm.utils.JCQueue; + +public class JcMetrics { + private final SimpleGauge capacity; + private final SimpleGauge population; + + JcMetrics(SimpleGauge capacity, + SimpleGauge population) { + this.capacity = capacity; + this.population = population; + } + + public void setCapacity(Long capacity) { + this.capacity.set(capacity); + } + + public void setPopulation(Long population) { + this.population.set(population); + } + + public void set(JCQueue.QueueMetrics metrics) { + this.capacity.set(metrics.capacity()); + this.population.set(metrics.population()); + } +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java new file mode 100644 index 00000000000..b91dea8c5b7 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/SimpleGauge.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Gauge; + +public class SimpleGauge implements Gauge { + private T value; + + public SimpleGauge(T value) { + this.value = value; + } + + @Override + public T getValue() { + return this.value; + } + + public void set(T value) { + this.value = value; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java new file mode 100644 index 00000000000..eba86ad906b --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StormMetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + + private static final MetricRegistry REGISTRY = new MetricRegistry(); + + private static final List REPORTERS = new ArrayList<>(); + + private static String hostName = null; + + public static SimpleGauge gauge( + T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) { + String metricName = metricName(name, topologyId, componentId, taskId, port); + if (REGISTRY.getGauges().containsKey(metricName)) { + return (SimpleGauge)REGISTRY.getGauges().get(metricName); + } else { + return REGISTRY.register(metricName, new SimpleGauge<>(initialValue)); + } + } + + public static JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) { + return new JcMetrics( + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port) + ); + } + + public static Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) { + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); + return REGISTRY.meter(metricName); + } + + public static Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) { + String metricName = metricName(name, context.getStormId(), componentId, streamId,taskId, context.getThisWorkerPort()); + return REGISTRY.counter(metricName); + } + + public static Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) { + String metricName = metricName(name, topologyId, componentId, streamId,taskId, workerPort); + return REGISTRY.counter(metricName); + } + + public static void start(Map stormConfig, DaemonType type) { + try { + hostName = dotToUnderScore(Utils.localHostname()); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + + " as 'localhost'."); + } + + LOG.info("Starting metrics reporters..."); + List> reporterList = (List>)stormConfig.get(Config.STORM_METRICS_REPORTERS); + if (reporterList != null && reporterList.size() > 0) { + for (Map reporterConfig : reporterList) { + // only start those requested + List daemons = (List) reporterConfig.get("daemons"); + for (String daemon : daemons) { + if (DaemonType.valueOf(daemon.toUpperCase()) == type) { + startReporter(stormConfig, reporterConfig); + } + } + } + } + } + + public static MetricRegistry registry() { + return REGISTRY; + } + + private static void startReporter(Map stormConfig, Map reporterConfig) { + String clazz = (String)reporterConfig.get("class"); + LOG.info("Attempting to instantiate reporter class: {}", clazz); + StormReporter reporter = ReflectionUtils.newInstance(clazz); + if (reporter != null) { + reporter.prepare(REGISTRY, stormConfig, reporterConfig); + reporter.start(); + REPORTERS.add(reporter); + } + + } + + public static void stop() { + for (StormReporter sr : REPORTERS) { + sr.stop(); + } + } + + public static String metricName(String name, String stormId, String componentId, String streamId, Integer taskId, Integer workerPort) { + StringBuilder sb = new StringBuilder("storm.worker."); + sb.append(stormId); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(componentId)); + sb.append("."); + sb.append(dotToUnderScore(streamId)); + sb.append("."); + sb.append(taskId); + sb.append("."); + sb.append(workerPort); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) { + StringBuilder sb = new StringBuilder("storm.worker."); + sb.append(stormId); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(componentId)); + sb.append("."); + sb.append(taskId); + sb.append("."); + sb.append(workerPort); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + public static String metricName(String name, TopologyContext context) { + + + StringBuilder sb = new StringBuilder("storm.topology."); + sb.append(context.getStormId()); + sb.append("."); + sb.append(hostName); + sb.append("."); + sb.append(dotToUnderScore(context.getThisComponentId())); + sb.append("."); + sb.append(context.getThisTaskId()); + sb.append("."); + sb.append(context.getThisWorkerPort()); + sb.append("-"); + sb.append(name); + return sb.toString(); + } + + private static String dotToUnderScore(String str) { + return str.replace('.', '_'); + } +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java new file mode 100644 index 00000000000..e796df56932 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Counter; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.task.WorkerTopologyContext; + +public class TaskMetrics { + private static final String METRIC_NAME_ACKED = "acked"; + private static final String METRIC_NAME_FAILED = "failed"; + private static final String METRIC_NAME_EMITTED = "emitted"; + private static final String METRIC_NAME_TRANSFERRED = "transferred"; + + private ConcurrentMap ackedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap failedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap emittedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap transferredByStream = new ConcurrentHashMap<>(); + + private String topologyId; + private String componentId; + private Integer taskId; + private Integer workerPort; + + public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid) { + this.topologyId = context.getStormId(); + this.componentId = componentId; + this.taskId = taskid; + this.workerPort = context.getThisWorkerPort(); + } + + public Counter getAcked(String streamId) { + Counter c = this.ackedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.ackedByStream.put(streamId, c); + } + return c; + } + + public Counter getFailed(String streamId) { + Counter c = this.failedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_FAILED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.failedByStream.put(streamId, c); + } + return c; + } + + public Counter getEmitted(String streamId) { + Counter c = this.emittedByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter(METRIC_NAME_EMITTED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.emittedByStream.put(streamId, c); + } + return c; + } + + public Counter getTransferred(String streamId) { + Counter c = this.transferredByStream.get(streamId); + if (c == null) { + c = StormMetricRegistry.counter( + METRIC_NAME_TRANSFERRED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + this.transferredByStream.put(streamId, c); + } + return c; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java new file mode 100644 index 00000000000..a209664570f --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/filters/RegexFilter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.Metric; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexFilter implements StormMetricsFilter { + + private Pattern pattern; + + + @Override + public void prepare(Map config) { + String expression = (String) config.get("expression"); + if (expression != null) { + this.pattern = Pattern.compile(expression); + } else { + throw new IllegalStateException("RegexFilter requires an 'expression' parameter."); + } + } + + @Override + public boolean matches(String name, Metric metric) { + Matcher matcher = this.pattern.matcher(name); + return matcher.matches(); + } + +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java new file mode 100644 index 00000000000..1f876aa8046 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/filters/StormMetricsFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.filters; + +import com.codahale.metrics.MetricFilter; + +import java.util.Map; + +public interface StormMetricsFilter extends MetricFilter { + + /** + * Called after the filter is instantiated. + * @param config A map of the properties from the 'filter' section of the reporter configuration. + */ + void prepare(Map config); + +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java new file mode 100644 index 00000000000..bef958550a8 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.MetricRegistry; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsoleStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class); + + @Override + public void prepare(MetricRegistry registry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing ConsoleReporter"); + ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry); + + builder.outputTo(System.out); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(stormConf); + if (locale != null) { + builder.formattedFor(locale); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(stormConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(stormConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + reporter = builder.build(); + } + +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java new file mode 100644 index 00000000000..c52cd2c53d5 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.MetricRegistry; +import java.io.File; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class); + + public static final String CSV_LOG_DIR = "csv.log.dir"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); + + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(reporterConf); + if (locale != null) { + builder.formatFor(locale); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + File csvMetricsDir = getCsvLogDir(stormConf, reporterConf); + reporter = builder.build(csvMetricsDir); + } + + + private static File getCsvLogDir(Map stormConf, Map reporterConf) { + String csvMetricsLogDirectory = ObjectReader.getString(reporterConf.get(CSV_LOG_DIR), null); + if (csvMetricsLogDirectory == null) { + csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf); + csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics"; + } + File csvMetricsDir = new File(csvMetricsLogDirectory); + validateCreateOutputDir(csvMetricsDir); + return csvMetricsDir; + } + + private static void validateCreateOutputDir(File dir) { + if (!dir.exists()) { + dir.mkdirs(); + } + if (!dir.canWrite()) { + throw new IllegalStateException(dir.getName() + " does not have write permissions."); + } + if (!dir.isDirectory()) { + throw new IllegalStateException(dir.getName() + " is not a directory."); + } + } +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java new file mode 100644 index 00000000000..c9f3253ac64 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ganglia.GangliaReporter; +import info.ganglia.gmetric4j.gmetric.GMetric; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GangliaStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + + public static final String GANGLIA_HOST = "ganglia.host"; + public static final String GANGLIA_PORT = "ganglia.port"; + public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; + public static final String GANGLIA_DMAX = "ganglia.dmax"; + public static final String GANGLIA_TMAX = "ganglia.tmax"; + public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; + public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; + public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; + public static final String GANGLIA_TTL = "ganglia.ttl"; + public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + Integer dmax = getGangliaDMax(reporterConf); + if (prefix != null) { + builder.withDMax(dmax); + } + + Integer tmax = getGangliaTMax(reporterConf); + if (prefix != null) { + builder.withTMax(tmax); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + String group = getMetricsTargetUdpGroup(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String udpAddressingMode = getMetricsTargetUdpAddressingMode(reporterConf); + Integer ttl = getMetricsTargetTtl(reporterConf); + + GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") + ? GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST; + + try { + GMetric sender = new GMetric(group, port, mode, ttl); + reporter = builder.build(sender); + } catch (IOException ioe) { + LOG.error("Exception in GangliaReporter config", ioe); + } + } + + + public static String getMetricsTargetUdpGroup(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_UDP_GROUP), null); + } + + public static String getMetricsTargetUdpAddressingMode(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null); + } + + public static Integer getMetricsTargetTtl(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_TTL), null); + } + + public static Integer getGangliaDMax(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_DMAX), null); + } + + public static Integer getGangliaTMax(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_TMAX), null); + } + + private static Integer getMetricsTargetPort(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GANGLIA_PORT), null); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null); + } + +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java new file mode 100644 index 00000000000..57eb5a4fdb2 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GraphiteStormReporter extends ScheduledStormReporter { + private static final Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); + + public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; + public static final String GRAPHITE_HOST = "graphite.host"; + public static final String GRAPHITE_PORT = "graphite.port"; + public static final String GRAPHITE_TRANSPORT = "graphite.transport"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + StormMetricsFilter filter = getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + // Not exposed: + // * withClock(Clock) + + String host = getMetricsTargetHost(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String transport = getMetricsTargetTransport(reporterConf); + GraphiteSender sender = null; + if (transport.equalsIgnoreCase("udp")) { + sender = new GraphiteUDP(host, port); + } else { + sender = new Graphite(host, port); + } + reporter = builder.build(sender); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null); + } + + private static String getMetricsTargetHost(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_HOST), null); + } + + private static Integer getMetricsTargetPort(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(GRAPHITE_PORT), null); + } + + private static String getMetricsTargetTransport(Map reporterConf) { + return ObjectReader.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp"); + } +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java new file mode 100644 index 00000000000..f995f902a8d --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmxStormReporter implements StormReporter { + private static final Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class); + public static final String JMX_DOMAIN = "jmx.domain"; + JmxReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.info("Preparing..."); + JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + String domain = getMetricsJmxDomain(reporterConf); + if (domain != null) { + builder.inDomain(domain); + } + + StormMetricsFilter filter = ScheduledStormReporter.getMetricsFilter(reporterConf); + if (filter != null) { + builder.filter(filter); + } + // other builder functions not exposed: + // * createsObjectNamesWith(ObjectNameFactory onFactory) + // * registerWith (MBeanServer) + // * specificDurationUnits (Map specificDurationUnits) + // * specificRateUnits(Map specificRateUnits) + + reporter = builder.build(); + } + + public static String getMetricsJmxDomain(Map reporterConf) { + return ObjectReader.getString(reporterConf, JMX_DOMAIN); + } + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java new file mode 100644 index 00000000000..61af5be5c1f --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.metrics2.filters.StormMetricsFilter; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ScheduledStormReporter implements StormReporter { + private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class); + protected ScheduledReporter reporter; + protected long reportingPeriod; + protected TimeUnit reportingPeriodUnit; + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(reportingPeriod, reportingPeriodUnit); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } + + + public static TimeUnit getReportPeriodUnit(Map reporterConf) { + TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); + return unit == null ? TimeUnit.SECONDS : unit; + } + + private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { + String rateUnitString = ObjectReader.getString(reporterConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } + + public static long getReportPeriod(Map reporterConf) { + return ObjectReader.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); + } + + public static StormMetricsFilter getMetricsFilter(Map reporterConf) { + StormMetricsFilter filter = null; + Map filterConf = (Map)reporterConf.get("filter"); + if (filterConf != null) { + String clazz = (String) filterConf.get("class"); + if (clazz != null) { + filter = ReflectionUtils.newInstance(clazz); + filter.prepare(filterConf); + } + } + return filter; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java new file mode 100644 index 00000000000..907965a7c40 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter extends Reporter { + String REPORT_PERIOD = "report.period"; + String REPORT_PERIOD_UNITS = "report.period.units"; + + void prepare(MetricRegistry metricsRegistry, Map conf, Map reporterConf); + + void start(); + + void stop(); +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index 78246bb4271..fbf1e92c665 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -15,44 +15,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import com.google.common.collect.Lists; - +import java.util.List; import org.apache.storm.generated.BoltStats; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; -import java.util.List; - @SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { - - MultiCountStatAndMetric ackedStats; - MultiCountStatAndMetric failedStats; MultiCountStatAndMetric executedStats; MultiLatencyStatAndMetric processLatencyStats; MultiLatencyStatAndMetric executeLatencyStats; public BoltExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); - this.failedStats = new MultiCountStatAndMetric(numStatBuckets); this.executedStats = new MultiCountStatAndMetric(numStatBuckets); this.processLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); this.executeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } - public MultiCountStatAndMetric getAcked() { - return ackedStats; - } - - public MultiCountStatAndMetric getFailed() { - return failedStats; - } - public MultiCountStatAndMetric getExecuted() { return executedStats; } @@ -67,8 +54,6 @@ public MultiLatencyStatAndMetric getExecuteLatencies() { @Override public void cleanupStats() { - ackedStats.close(); - failedStats.close(); executedStats.close(); processLatencyStats.close(); executeLatencyStats.close(); @@ -81,16 +66,17 @@ public void boltExecuteTuple(String component, String stream, long latencyMs) { this.getExecuteLatencies().record(key, latencyMs); } - public void boltAckedTuple(String component, String stream, long latencyMs) { + public void boltAckedTuple(String component, String stream, long latencyMs, Counter ackedCounter) { List key = Lists.newArrayList(component, stream); this.getAcked().incBy(key, this.rate); + ackedCounter.inc(this.rate); this.getProcessLatencies().record(key, latencyMs); } - public void boltFailedTuple(String component, String stream, long latencyMs) { + public void boltFailedTuple(String component, String stream, long latencyMs, Counter failedCounter) { List key = Lists.newArrayList(component, stream); this.getFailed().incBy(key, this.rate); - + failedCounter.inc(this.rate); } @Override @@ -103,8 +89,8 @@ public ExecutorStats renderStats() { // bolt stats BoltStats boltStats = new BoltStats( - StatsUtil.windowSetConverter(valueStat(ackedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(failedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(getAcked()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(getFailed()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java index b6461c580fe..4c95da06e4f 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java @@ -15,8 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import java.util.Map; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; @@ -24,9 +26,10 @@ @SuppressWarnings("unchecked") public abstract class CommonStats { - private final MultiCountStatAndMetric emittedStats; private final MultiCountStatAndMetric transferredStats; + private final MultiCountStatAndMetric ackedStats; + private final MultiCountStatAndMetric failedStats; protected final int rate; @@ -34,6 +37,16 @@ public CommonStats(int rate,int numStatBuckets) { this.rate = rate; this.emittedStats = new MultiCountStatAndMetric(numStatBuckets); this.transferredStats = new MultiCountStatAndMetric(numStatBuckets); + this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); + this.failedStats = new MultiCountStatAndMetric(numStatBuckets); + } + + public MultiCountStatAndMetric getFailed() { + return failedStats; + } + + public MultiCountStatAndMetric getAcked() { + return ackedStats; } public int getRate() { @@ -48,17 +61,21 @@ public MultiCountStatAndMetric getTransferred() { return transferredStats; } - public void emittedTuple(String stream) { + public void emittedTuple(String stream, Counter emittedCounter) { this.getEmitted().incBy(stream, this.rate); + emittedCounter.inc(this.rate); } - public void transferredTuples(String stream, int amount) { + public void transferredTuples(String stream, int amount, Counter transferredCounter) { this.getTransferred().incBy(stream, this.rate * amount); + transferredCounter.inc(amount); } public void cleanupStats() { emittedStats.close(); transferredStats.close(); + ackedStats.close(); + failedStats.close(); } protected Map> valueStat(MultiCountStatAndMetric metric) { diff --git a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 6c3d589a2ab..2fc15020c26 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -15,55 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.stats; +import com.codahale.metrics.Counter; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.generated.SpoutStats; -import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @SuppressWarnings("unchecked") public class SpoutExecutorStats extends CommonStats { - - private final MultiCountStatAndMetric ackedStats; - private final MultiCountStatAndMetric failedStats; private final MultiLatencyStatAndMetric completeLatencyStats; public SpoutExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); - this.failedStats = new MultiCountStatAndMetric(numStatBuckets); this.completeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } - public MultiCountStatAndMetric getAcked() { - return ackedStats; - } - - public MultiCountStatAndMetric getFailed() { - return failedStats; - } - public MultiLatencyStatAndMetric getCompleteLatencies() { return completeLatencyStats; } @Override public void cleanupStats() { - ackedStats.close(); - failedStats.close(); completeLatencyStats.close(); super.cleanupStats(); } - public void spoutAckedTuple(String stream, long latencyMs) { + public void spoutAckedTuple(String stream, long latencyMs, Counter ackedCounter) { this.getAcked().incBy(stream, this.rate); + ackedCounter.inc(this.rate); this.getCompleteLatencies().record(stream, latencyMs); } - public void spoutFailedTuple(String stream, long latencyMs) { + public void spoutFailedTuple(String stream, long latencyMs, Counter failedCounter) { this.getFailed().incBy(stream, this.rate); + failedCounter.inc(this.rate); } @Override @@ -76,7 +64,7 @@ public ExecutorStats renderStats() { // spout stats SpoutStats spoutStats = new SpoutStats( - valueStat(ackedStats), valueStat(failedStats), valueStat(completeLatencyStats)); + valueStat(getAcked()), valueStat(getFailed()), valueStat(completeLatencyStats)); ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); return ret; diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java index 0a1765b18cd..5985684e72e 100644 --- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java +++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java @@ -15,30 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.task; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.NotImplementedException; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IReducer; -import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.ReducedMetric; -import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.state.ISubscribedState; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.NotImplementedException; import org.json.simple.JSONValue; /** @@ -168,9 +173,9 @@ public String getThisComponentId() { } /** - * Gets the declared output fields for the specified stream id for the - * component this task is a part of. - */ + * Gets the declared output fields for the specified stream id for the + * component this task is a part of. + */ public Fields getThisOutputFields(String streamId) { return getComponentOutputFields(getThisComponentId(), streamId); } @@ -202,8 +207,8 @@ public Set getThisStreams() { public int getThisTaskIndex() { List tasks = new ArrayList<>(getComponentTasks(getThisComponentId())); Collections.sort(tasks); - for(int i=0; i T registerMetric(String name, T metric, int timeBucketSizeInSecs) { if(_openOrPrepareWasCalled.get()) { throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + @@ -349,18 +355,18 @@ public T registerMetric(String name, T metric, int timeBucke } Map>> m1 = _registeredMetrics; - if(!m1.containsKey(timeBucketSizeInSecs)) { + if (!m1.containsKey(timeBucketSizeInSecs)) { m1.put(timeBucketSizeInSecs, new HashMap>()); } Map> m2 = m1.get(timeBucketSizeInSecs); - if(!m2.containsKey(_taskId)) { + if (!m2.containsKey(_taskId)) { m2.put(_taskId, new HashMap()); } Map m3 = m2.get(_taskId); - if(m3.containsKey(name)) { - throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + if (m3.containsKey(name)) { + throw new RuntimeException("The same metric name `" + name + "` was registered twice."); } else { m3.put(name, metric); } @@ -375,6 +381,7 @@ public T registerMetric(String name, T metric, int timeBucke * cause the same metric name can register twice. * So we just return the first metric we meet. */ + @Deprecated public IMetric getRegisteredMetricByName(String name) { IMetric metric = null; @@ -395,13 +402,40 @@ public IMetric getRegisteredMetricByName(String name) { /* * Convenience method for registering ReducedMetric. */ + @Deprecated public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } + /* * Convenience method for registering CombinedMetric. */ + @Deprecated public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + + public Timer registerTimer(String name) { + return StormMetricRegistry.registry().timer(metricName(name)); + } + + public Histogram registerHistogram(String name) { + return StormMetricRegistry.registry().histogram(metricName(name)); + } + + public Meter registerMeter(String name) { + return StormMetricRegistry.registry().meter(metricName(name)); + } + + public Counter registerCounter(String name) { + return StormMetricRegistry.registry().counter(metricName(name)); + } + + public Gauge registerGauge(String name, Gauge gauge) { + return StormMetricRegistry.registry().register(metricName(name), gauge); + } + + private String metricName(String name) { + return StormMetricRegistry.metricName(name, this); + } } diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java index 7a64812da03..3212df09353 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java +++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java @@ -18,27 +18,34 @@ package org.apache.storm.utils; -import org.apache.storm.policy.IWaitStrategy; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.internal.RateTracker; +import org.apache.storm.metrics2.JcMetrics; +import org.apache.storm.metrics2.StormMetricRegistry; +import org.apache.storm.policy.IWaitStrategy; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.MpscArrayQueue; import org.jctools.queues.MpscUnboundedArrayQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - - public class JCQueue implements IStatefulObject { private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class); + private static final String PREFIX = "jc-"; + private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(PREFIX + "metrics-reporter").build()); public static final Object INTERRUPT = new Object(); private final ExitCondition continueRunning = () -> true; + private final JcMetrics jcMetrics; private interface Inserter { // blocking call that can be interrupted using Thread.interrupt() @@ -64,7 +71,7 @@ public void publish(Object obj) throws InterruptedException { int idleCount = 0; while (!inserted) { q.metrics.notifyInsertFailure(); - if (idleCount==0) { // check avoids multiple log msgs when in a idle loop + if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", q.getName()); } @@ -90,7 +97,6 @@ public boolean tryPublish(Object obj) { @Override public void flush() throws InterruptedException { - return; } @Override @@ -111,7 +117,7 @@ public BatchInserter(JCQueue q, int batchSz) { this.currentBatch = new ArrayList<>(batchSz + 1); } - /** Blocking call - retires till element is successfully added */ + /** Blocking call - retires till element is successfully added. */ @Override public void publish(Object obj) throws InterruptedException { currentBatch.add(obj); @@ -143,7 +149,7 @@ public void flush() throws InterruptedException { int retryCount = 0; while (publishCount == 0) { // retry till at least 1 element is drained q.metrics.notifyInsertFailure(); - if (retryCount==0) { // check avoids multiple log msgs when in a idle loop + if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", q.getName()); } retryCount = q.backPressureWaitStrategy.idle(retryCount); @@ -236,7 +242,10 @@ public void close() { } private final MpscArrayQueue recvQueue; - private final MpscUnboundedArrayQueue overflowQ; // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full + + // only holds msgs from other workers (via WorkerTransfer), when recvQueue is full + private final MpscUnboundedArrayQueue overflowQ; + private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow. @@ -250,17 +259,28 @@ public void close() { private String queueName; private final IWaitStrategy backPressureWaitStrategy; - public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy) { + public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy, + String topologyId, String componentId, Integer taskId, int port) { this.queueName = queueName; this.overflowLimit = overflowLimit; this.recvQueue = new MpscArrayQueue<>(size); this.overflowQ = new MpscUnboundedArrayQueue<>(size); this.metrics = new JCQueue.QueueMetrics(); + this.jcMetrics = StormMetricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port); //The batch size can be no larger than half the full recvQueue size, to avoid contention issues. this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2)); this.backPressureWaitStrategy = backPressureWaitStrategy; + + if (!METRICS_REPORTER_EXECUTOR.isShutdown()) { + METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + jcMetrics.set(metrics); + } + }, 15, 15, TimeUnit.SECONDS); + } } public String getName() { @@ -271,6 +291,7 @@ public String getName() { public boolean haltWithInterrupt() { boolean res = tryPublishInternal(INTERRUPT); metrics.close(); + METRICS_REPORTER_EXECUTOR.shutdown(); return res; } @@ -294,15 +315,18 @@ public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) { } } - public int size() { return recvQueue.size() + overflowQ.size(); } + public int size() { + return recvQueue.size() + overflowQ.size(); + } /** * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q + * @param consumer * @param exitCond */ private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException { int drainCount = 0; - while ( exitCond.keepRunning() ) { + while (exitCond.keepRunning()) { Object tuple = recvQueue.poll(); if (tuple == null) { break; @@ -374,7 +398,7 @@ public void publish(Object obj) throws InterruptedException { } /** - * Non-blocking call, returns false if full + * Non-blocking call, returns false if full. **/ public boolean tryPublish(Object obj) { Inserter inserter = getInserter(); @@ -391,7 +415,7 @@ public boolean tryPublishDirect(Object obj) { * returns false if overflowLimit has reached */ public boolean tryPublishToOverflow(Object obj) { - if (overflowLimit>0 && overflowQ.size() >= overflowLimit) { + if (overflowLimit > 0 && overflowQ.size() >= overflowLimit) { return false; } overflowQ.add(obj); diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java index 34adbc49694..cb998701bee 100644 --- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -46,9 +46,11 @@ public class ConfigValidation { private static final Logger LOG = LoggerFactory.getLogger(ConfigValidation.class); - public static abstract class Validator { + public abstract static class Validator { public Validator(Map params) {} + public Validator() {} + public abstract void validateField(String name, Object o); } @@ -57,7 +59,7 @@ public Validator() {} */ /** - * Validates if an object is not null + * Validates if an object is not null. */ public static class NotNullValidator extends Validator { @@ -71,7 +73,7 @@ public void validateField(String name, Object o) { } /** - * Validates basic types + * Validates basic types. */ public static class SimpleTypeValidator extends Validator { @@ -93,7 +95,8 @@ public static void validateField(String name, Class type, Object o) { if (type.isInstance(o)) { return; } - throw new IllegalArgumentException("Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass()); + throw new IllegalArgumentException( + "Field " + name + " must be of type " + type + ". Object: " + o + " actual type: " + o.getClass()); } } @@ -137,9 +140,10 @@ public StringValidator(){} public StringValidator(Map params) { - this.acceptedValues = new HashSet(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES))); + this.acceptedValues = + new HashSet(Arrays.asList((String[])params.get(ConfigValidationAnnotations.ValidatorParams.ACCEPTED_VALUES))); - if(this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) { + if (this.acceptedValues.isEmpty() || (this.acceptedValues.size() == 1 && this.acceptedValues.contains(""))) { this.acceptedValues = null; } } @@ -147,7 +151,7 @@ public StringValidator(Map params) { @Override public void validateField(String name, Object o) { SimpleTypeValidator.validateField(name, String.class, o); - if(this.acceptedValues != null) { + if (this.acceptedValues != null) { if (!this.acceptedValues.contains((String) o)) { throw new IllegalArgumentException("Field " + name + " is not an accepted value. Value: " + o + " Accepted values: " + this.acceptedValues); } @@ -194,8 +198,8 @@ public void validateInteger(String name, Object o) { return; } final long i; - if (o instanceof Number && - (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { + if (o instanceof Number + && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) { return; } @@ -205,7 +209,7 @@ public void validateInteger(String name, Object o) { } /** - * Validates an entry for ImpersonationAclUser + * Validates an entry for ImpersonationAclUser. */ public static class ImpersonationAclUserEntryValidator extends Validator { @@ -214,7 +218,8 @@ public void validateField(String name, Object o) { if (o == null) { return; } - ConfigValidationUtils.NestableFieldValidator validator = ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false), + ConfigValidationUtils.NestableFieldValidator validator = + ConfigValidationUtils.mapFv(ConfigValidationUtils.fv(String.class, false), ConfigValidationUtils.listFv(String.class, false), false); validator.validateField(name, o); @SuppressWarnings("unchecked") @@ -229,7 +234,7 @@ public void validateField(String name, Object o) { } /** - * validates a list of has no duplicates + * validates a list of has no duplicates. */ public static class NoDuplicateInListValidator extends Validator { @@ -251,7 +256,7 @@ public void validateField(String name, Object field) { } /** - * Validates a String or a list of Strings + * Validates a String or a list of Strings. */ public static class StringOrStringListValidator extends Validator { @@ -269,7 +274,7 @@ public void validateField(String name, Object o) { } /** - * Validates Kryo Registration + * Validates Kryo Registration. */ public static class KryoRegValidator extends Validator { @@ -283,8 +288,8 @@ public void validateField(String name, Object o) { for (Object e : (Iterable) o) { if (e instanceof Map) { for (Map.Entry entry : ((Map) e).entrySet()) { - if (!(entry.getKey() instanceof String) || - !(entry.getValue() instanceof String)) { + if (!(entry.getKey() instanceof String) + || !(entry.getValue() instanceof String)) { throw new IllegalArgumentException( "Each element of the list " + name + " must be a String or a Map of Strings"); } @@ -302,7 +307,7 @@ public void validateField(String name, Object o) { } /** - * Validates if a number is a power of 2 + * Validates if a number is a power of 2. */ public static class PowerOf2Validator extends Validator { @@ -312,8 +317,8 @@ public void validateField(String name, Object o) { return; } final long i; - if (o instanceof Number && - (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { + if (o instanceof Number + && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { // Test whether the integer is a power of 2. if (i > 0 && (i & (i - 1)) == 0) { return; @@ -324,7 +329,7 @@ public void validateField(String name, Object o) { } /** - * Validates each entry in a list + * Validates each entry in a list. */ public static class ListEntryTypeValidator extends Validator { @@ -346,10 +351,10 @@ public static void validateField(String name, Class type, Object o) { } /** - * Validates each entry in a list against a list of custom Validators + * Validates each entry in a list against a list of custom Validators. * Each validator in the list of validators must inherit or be an instance of Validator class */ - public static class ListEntryCustomValidator extends Validator{ + public static class ListEntryCustomValidator extends Validator { private Class[] entryValidators; @@ -366,7 +371,8 @@ public void validateField(String name, Object o) { } } - public static void validateField(String name, Class[] validators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + public static void validateField(String name, Class[] validators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; } @@ -386,9 +392,9 @@ public static void validateField(String name, Class[] validators, Object o) t } /** - * validates each key and value in a map of a certain type + * validates each key and value in a map of a certain type. */ - public static class MapEntryTypeValidator extends Validator{ + public static class MapEntryTypeValidator extends Validator { private Class keyType; private Class valueType; @@ -410,9 +416,9 @@ public static void validateField(String name, Class keyType, Class valueTy } /** - * validates each key and each value against the respective arrays of validators + * validates each key and each value against the respective arrays of validators. */ - public static class MapEntryCustomValidator extends Validator{ + public static class MapEntryCustomValidator extends Validator { private Class[] keyValidators; private Class[] valueValidators; @@ -432,7 +438,8 @@ public void validateField(String name, Object o) { } @SuppressWarnings("unchecked") - public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { + public static void validateField(String name, Class[] keyValidators, Class[] valueValidators, Object o) + throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException { if (o == null) { return; } @@ -460,9 +467,9 @@ public static void validateField(String name, Class[] keyValidators, Class } /** - * Validates a positive number + * Validates a positive number. */ - public static class PositiveNumberValidator extends Validator{ + public static class PositiveNumberValidator extends Validator { private boolean includeZero; @@ -484,7 +491,7 @@ public static void validateField(String name, boolean includeZero, Object o) { return; } if (o instanceof Number) { - if(includeZero) { + if (includeZero) { if (((Number) o).doubleValue() >= 0.0) { return; } @@ -518,14 +525,14 @@ public static class MetricRegistryValidator extends Validator { @Override public void validateField(String name, Object o) { - if(o == null) { + if (o == null) { return; } SimpleTypeValidator.validateField(name, Map.class, o); - if(!((Map) o).containsKey("class") ) { - throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + if (!((Map) o).containsKey("class")) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); } - if(!((Map) o).containsKey("parallelism.hint") ) { + if (!((Map) o).containsKey("parallelism.hint")) { throw new IllegalArgumentException("Field " + name + " must have map entry with key: parallelism.hint"); } @@ -534,21 +541,65 @@ public void validateField(String name, Object o) { } } + public static class MetricReportersValidator extends Validator { + private static final String NIMBUS = "nimbus"; + private static final String SUPERVISOR = "supervisor"; + private static final String WORKER = "worker"; + private static final String CLASS = "class"; + private static final String FILTER = "filter"; + private static final String DAEMONS = "daemons"; + + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if (!((Map) o).containsKey(CLASS)) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); + } + if (!((Map) o).containsKey(DAEMONS)) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); + } else { + // daemons can only be 'nimbus', 'supervisor', or 'worker' + Object list = ((Map)o).get(DAEMONS); + if (!(list instanceof List)) { + throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); + } + List daemonList = (List)list; + for (Object string : daemonList) { + if (string instanceof String + && (string.equals(NIMBUS) || string.equals(SUPERVISOR) || string.equals(WORKER))) { + continue; + } + throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" + + " \"nimbus\", \"supervisor\", or \"worker\""); + } + + } + if (((Map)o).containsKey(FILTER)) { + Map filterMap = (Map)((Map)o).get(FILTER); + SimpleTypeValidator.validateField(CLASS, String.class, filterMap.get(CLASS)); + } + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get(CLASS)); + } + } + public static class EventLoggerRegistryValidator extends Validator { @Override public void validateField(String name, Object o) { - if(o == null) { + if (o == null) { return; } SimpleTypeValidator.validateField(name, Map.class, o); - if(!((Map) o).containsKey("class") ) { - throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + if (!((Map) o).containsKey("class")) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: class"); } SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); - if(((Map) o).containsKey("arguments") ) { + if (((Map) o).containsKey("arguments")) { SimpleTypeValidator.validateField(name, Map.class, ((Map) o).get("arguments")); } } @@ -570,10 +621,9 @@ public void validateField(String name, Object o) { throw new IllegalArgumentException("Field " + name + " must be set."); } - if (o instanceof String && - (((String) o).equals("NONE") || - ((String) o).equals("DIGEST") || - ((String) o).equals("KERBEROS"))) { + if (o instanceof String + && (((String) o).equals("NONE") || ((String) o).equals("DIGEST") + || ((String) o).equals("KERBEROS"))) { return; } throw new IllegalArgumentException("Field " + name + " must be one of \"NONE\", \"DIGEST\", or \"KERBEROS\""); @@ -684,9 +734,9 @@ public static void validateField(String fieldName, Map conf) { private static List> configClasses = null; //We follow the model of service loaders (Even though it is not a service). - private static final String CONFIG_CLASSES_NAME = "META-INF/services/"+Validated.class.getName(); + private static final String CONFIG_CLASSES_NAME = "META-INF/services/" + Validated.class.getName(); - private synchronized static List> getConfigClasses() { + private static synchronized List> getConfigClasses() { if (configClasses == null) { List> ret = new ArrayList<>(); Set classesToScan = new HashSet<>(); @@ -695,7 +745,7 @@ private synchronized static List> getConfigClasses() { try { try (BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()))) { String line; - while((line = in.readLine()) != null) { + while ((line = in.readLine()) != null) { line = line.replaceAll("#.*$", "").trim(); if (!line.isEmpty()) { classesToScan.add(line); @@ -720,7 +770,7 @@ private synchronized static List> getConfigClasses() { } /** - * Validates a field given field name as string + * Validates a field given field name as string. * * @param fieldName provided as a string * @param conf map of confs @@ -783,9 +833,7 @@ public static void validateField(Field field, Map conf) { //If validator has a constructor that takes a Map as an argument call that constructor if (hasConstructor(clazz, Map.class)) { o = clazz.getConstructor(Map.class).newInstance(params); - } - //If not call default constructor - else { + } else { //If not call default constructor o = clazz.newInstance(); } o.validateField(field.getName(), conf.get(key)); @@ -797,7 +845,7 @@ public static void validateField(Field field, Map conf) { } /** - * Validate all confs in map + * Validate all confs in map. * * @param conf map of configs */ @@ -811,15 +859,15 @@ public static void validateFields(Map conf) { private static final int ACC_STATIC = 0x0008; private static final int ACC_FINAL = 0x0010; private static final int DESIRED_FIELD_ACC = ACC_PUBLIC | ACC_STATIC | ACC_FINAL; + public static boolean isFieldAllowed(Field field) { - return field.getAnnotation(NotConf.class) == null && - String.class.equals(field.getType()) && - ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) && - !field.isSynthetic(); + return field.getAnnotation(NotConf.class) == null + && String.class.equals(field.getType()) + && ((field.getModifiers() & DESIRED_FIELD_ACC) == DESIRED_FIELD_ACC) && !field.isSynthetic(); } /** - * Validate all confs in map + * Validate all confs in map. * * @param conf map of configs * @param classes config class @@ -848,9 +896,10 @@ public static void validateFields(Map conf, List> class } } - private static Map getParamsFromAnnotation(Class validatorClass, Object v) throws InvocationTargetException, IllegalAccessException { + private static Map getParamsFromAnnotation(Class validatorClass, Object v) + throws InvocationTargetException, IllegalAccessException { Map params = new HashMap(); - for(Method method : validatorClass.getDeclaredMethods()) { + for (Method method : validatorClass.getDeclaredMethods()) { Object value = null; try { @@ -858,7 +907,7 @@ private static Map getParamsFromAnnotation(Class validatorClas } catch (IllegalArgumentException ex) { value = null; } - if(value != null) { + if (value != null) { params.put(method.getName(), value); } } diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java index 29ba179443e..e48d0903262 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java @@ -46,7 +46,7 @@ public void testNoReOrderingUnderBackPressure() throws Exception { } private static JCQueue createQueue(String name, int queueSize) { - return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0)); + return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test",1000, 1000); } private static class TestConsumer implements Consumer { diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index bdc49375afc..3d48de2bdfb 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -200,10 +200,10 @@ public void run() { } private JCQueue createQueue(String name, int queueSize) { - return new JCQueue(name, queueSize, 0, 1, waitStrategy); + return createQueue(name, 1, queueSize); } private JCQueue createQueue(String name, int batchSize, int queueSize) { - return new JCQueue(name, queueSize, 0, batchSize, waitStrategy); + return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test",1000, 1000); } } diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index ec1815dfbc8..c6cff6bfe20 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -18,19 +18,22 @@ package org.apache.storm; +import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; import static org.apache.storm.validation.ConfigValidationAnnotations.isInteger; +import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; import static org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.isString; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringList; import static org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList; -import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; -import static org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom; -import static org.apache.storm.validation.ConfigValidationAnnotations.isBoolean; import static org.apache.storm.validation.ConfigValidationAnnotations.isNumber; import static org.apache.storm.validation.ConfigValidationAnnotations.isImplementationOfClass; import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryType; import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplicateInList; import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom; +import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull; + +import java.util.ArrayList; +import java.util.Map; import org.apache.storm.container.ResourceIsolationInterface; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; @@ -42,9 +45,6 @@ import org.apache.storm.validation.ConfigValidation; import org.apache.storm.validation.Validated; -import java.util.ArrayList; -import java.util.Map; - /** * Storm configs are specified as a plain old map. This class provides constants for * all the configurations possible on a Storm cluster. Each constant is paired with an annotation @@ -68,32 +68,12 @@ public class DaemonConfig implements Validated { @isStringList public static final String STORM_DAEMON_METRICS_REPORTER_PLUGINS = "storm.daemon.metrics.reporter.plugins"; - /** - * A specify Locale for daemon metrics reporter plugin. - * Use the specified IETF BCP 47 language tag string for a Locale. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE = "storm.daemon.metrics.reporter.plugin.locale"; - /** * A specify domain for daemon metrics reporter plugin to limit reporting to specific domain. */ @isString public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN = "storm.daemon.metrics.reporter.plugin.domain"; - /** - * A specify rate-unit in TimeUnit to specify reporting frequency for daemon metrics reporter plugin. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT = "storm.daemon.metrics.reporter.plugin.rate.unit"; - - /** - * A specify duration-unit in TimeUnit to specify reporting window for daemon metrics reporter plugin. - */ - @isString - public static final String STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT = "storm.daemon.metrics.reporter.plugin.duration.unit"; - - /** * A specify csv reporter directory for CvsPreparableReporter daemon metrics reporter. */ @@ -303,7 +283,7 @@ public class DaemonConfig implements Validated { public static final String NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE = "nimbus.assignments.service.thread.queue.size"; /** - * class controls heartbeats recovery strategy + * class controls heartbeats recovery strategy. */ @isString public static final String NIMBUS_WORKER_HEARTBEATS_RECOVERY_STRATEGY_CLASS = "nimbus.worker.heartbeats.recovery.strategy.class"; @@ -829,7 +809,8 @@ public class DaemonConfig implements Validated { */ @NotNull @isPositiveNumber - public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs"; + public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = + "storm.cluster.metrics.consumer.publish.interval.secs"; /** * Enables user-first classpath. See topology.classpath.beginning. @@ -863,8 +844,8 @@ public class DaemonConfig implements Validated { /** * For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. - * If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of - * the real URI to use ArtifactoryConfigLoader. + * If it is a directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" + * to the beginning of the real URI to use ArtifactoryConfigLoader. * For FileConfigLoader, this is the URI pointing to a file. */ @isString diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java index b5bd8bc66e7..85ee27f0da3 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/MetricsUtils.java @@ -15,8 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; @@ -26,15 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.TimeUnit; - public class MetricsUtils { - private final static Logger LOG = LoggerFactory.getLogger(MetricsUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsUtils.class); public static List getPreparableReporters(Map topoConf) { List clazzes = (List) topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGINS); @@ -60,30 +59,6 @@ private static PreparableReporter getPreparableReporter(String clazz) { return reporter; } - public static Locale getMetricsReporterLocale(Map topoConf) { - String languageTag = ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_LOCALE), null); - if (languageTag != null) { - return Locale.forLanguageTag(languageTag); - } - return null; - } - - public static TimeUnit getMetricsRateUnit(Map topoConf) { - return getTimeUnitForCofig(topoConf, DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_RATE_UNIT); - } - - public static TimeUnit getMetricsDurationUnit(Map topoConf) { - return getTimeUnitForCofig(topoConf, DaemonConfig.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DURATION_UNIT); - } - - private static TimeUnit getTimeUnitForCofig(Map topoConf, String configName) { - String rateUnitString = ObjectReader.getString(topoConf.get(configName), null); - if (rateUnitString != null) { - return TimeUnit.valueOf(rateUnitString); - } - return null; - } - public static File getCsvLogDir(Map topoConf) { String csvMetricsLogDirectory = ObjectReader.getString(topoConf.get(DaemonConfig.STORM_DAEMON_METRICS_REPORTER_CSV_LOG_DIR), null); if (csvMetricsLogDirectory == null) { diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java index bc34b4b3b0a..cd6d0693b18 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java @@ -15,20 +15,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.ConsoleReporter; import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsolePreparableReporter implements PreparableReporter { - private final static Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); ConsoleReporter reporter = null; @Override @@ -37,17 +37,17 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(metricsRegistry); builder.outputTo(System.out); - Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf); if (locale != null) { builder.formattedFor(locale); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf); + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(topoConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java index b67cc65a910..3ab6a996bab 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/CsvPreparableReporter.java @@ -15,21 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.CsvReporter; import com.codahale.metrics.MetricRegistry; -import org.apache.storm.daemon.metrics.MetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.File; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CsvPreparableReporter implements PreparableReporter { - private final static Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(CsvPreparableReporter.class); CsvReporter reporter = null; @Override @@ -37,17 +38,17 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf LOG.debug("Preparing..."); CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); - Locale locale = MetricsUtils.getMetricsReporterLocale(topoConf); + Locale locale = ClientMetricsUtils.getMetricsReporterLocale(topoConf); if (locale != null) { builder.formatFor(locale); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } - TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(topoConf); + TimeUnit durationUnit = ClientMetricsUtils.getMetricsDurationUnit(topoConf); if (durationUnit != null) { builder.convertDurationsTo(durationUnit); } diff --git a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java index 21aab16c5ee..c369879c0fe 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java @@ -15,21 +15,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.daemon.metrics.reporters; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.storm.DaemonConfig; -import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.daemon.metrics.ClientMetricsUtils; import org.apache.storm.utils.ObjectReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.TimeUnit; - public class JmxPreparableReporter implements PreparableReporter { - private final static Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); + private static final Logger LOG = LoggerFactory.getLogger(JmxPreparableReporter.class); JmxReporter reporter = null; @Override @@ -40,7 +40,7 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf if (domain != null) { builder.inDomain(domain); } - TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(topoConf); + TimeUnit rateUnit = ClientMetricsUtils.getMetricsRateUnit(topoConf); if (rateUnit != null) { builder.convertRatesTo(rateUnit); } diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java index 55a86cfa744..ab164cad958 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/DefaultTopologyValidator.java @@ -15,19 +15,53 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.nimbus; +import java.util.Map; +import org.apache.storm.generated.Bolt; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StormTopology; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DefaultTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyValidator.class); @Override public void prepare(Map StormConf){ } @Override - public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if (topologyName.contains(".")) { + LOG.warn("Metrics for topology name '{}' will be reported as '{}'.", topologyName, topologyName.replace('.', '_')); + } + Map spouts = topology.get_spouts(); + for (String spoutName : spouts.keySet()) { + if (spoutName.contains(".")) { + LOG.warn("Metrics for spout name '{}' will be reported as '{}'.", spoutName, spoutName.replace('.', '_')); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for (String streamName : spoutSpec.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_')); + } + } + } + + Map bolts = topology.get_bolts(); + for (String boltName : bolts.keySet()) { + if (boltName.contains(".")) { + LOG.warn("Metrics for bolt name '{}' will be reported as '{}'.", boltName, boltName.replace('.', '_')); + } + Bolt bolt = bolts.get(boltName); + for (String streamName : bolt.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + LOG.warn("Metrics for stream name '{}' will be reported as '{}'.", streamName, streamName.replace('.', '_')); + } + } + } } } diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java new file mode 100644 index 00000000000..cdf22c76765 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/nimbus/StrictTopologyValidator.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.nimbus; + +import java.util.Map; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StrictTopologyValidator implements ITopologyValidator { + private static final Logger LOG = LoggerFactory.getLogger(StrictTopologyValidator.class); + + @Override + public void prepare(Map stormConf){ + } + + @Override + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + if (topologyName.contains(".")) { + throw new InvalidTopologyException(String.format("Topology name '%s' contains illegal character '.'", topologyName)); + } + Map spouts = topology.get_spouts(); + for (String spoutName : spouts.keySet()) { + if (spoutName.contains(".")) { + throw new InvalidTopologyException(String.format("Spout name '%s' contains illegal character '.'", spoutName)); + } + SpoutSpec spoutSpec = spouts.get(spoutName); + for (String streamName : spoutSpec.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + + Map bolts = topology.get_bolts(); + for (String boltName : bolts.keySet()) { + if (boltName.contains(".")) { + throw new InvalidTopologyException(String.format("Bolt name '%s' contains illegal character '.'", boltName)); + } + Bolt bolt = bolts.get(boltName); + for (String streamName : bolt.get_common().get_streams().keySet()) { + if (streamName.contains(".")) { + throw new InvalidTopologyException(String.format("Stream name '%s' contains illegal character '.'", streamName)); + } + } + } + } +}