From d886628fced8fa352a3032613349bbda6c74fd90 Mon Sep 17 00:00:00 2001 From: r7raul1984 Date: Thu, 19 Jan 2017 13:23:41 +0800 Subject: [PATCH 1/5] [EAGLE-872] Transform counter metric to rate metric - Add transform bolt using counterToRateFunction in HadoopMetricMonitorApp's storm topology. https://issues.apache.org/jira/browse/EAGLE-872 --- .../builder/CounterToRateFunction.java | 214 +++++++++++++ .../org/apache/eagle/app/utils/Clock.java | 24 ++ .../eagle/app/utils/ClockWithOffset.java | 35 ++ .../apache/eagle/app/utils/ManualClock.java | 54 ++++ .../builder/CounterToRateFunctionTest.java | 303 ++++++++++++++++++ .../messaging/MetricStreamPersistTest.java | 142 ++++++++ .../eagle/metric/HadoopMetricMonitorApp.java | 19 +- .../running/MRRunningJobApplicationTest.java | 2 - .../mr/running/MRRunningJobManagerTest.java | 3 - .../MRJobEntityCreationHandlerTest.java | 2 - 10 files changed, 785 insertions(+), 13 deletions(-) create mode 100644 eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java create mode 100644 eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java create mode 100644 eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java create mode 100644 eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java create mode 100644 eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java create mode 100644 eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java new file mode 100644 index 0000000000..6712c9046e --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import com.google.common.base.Preconditions; +import org.apache.eagle.app.utils.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CounterToRateFunction implements TransformFunction { + private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class); + private final Map cache; + private MetricDefinition metricDefinition; + private Collector collector; + + public CounterToRateFunction(MetricDefinition metricDefinition, long heartbeat, TimeUnit unit, final Clock clock) { + final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit); + this.cache = new LinkedHashMap(16, 0.75f, true) { + protected boolean removeEldestEntry(Map.Entry eldest) { + final long now = clock.now(); + final long lastMod = eldest.getValue().getTimestamp(); + final boolean expired = (now - lastMod) > heartbeatMillis; + if (expired) { + LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey()); + } + return expired; + } + }; + this.metricDefinition = metricDefinition; + } + + @Override + public String getName() { + return "CounterToRate"; + } + + @Override + public void open(Collector collector) { + this.collector = collector; + } + + @Override + public void transform(Map event) { + Metric metric = toMetric(event); + LOG.debug("received {} metrics", metric); + if (metric.isCounter()) { + final String metricName = metric.getMetricName(); + final CounterValue prev = cache.get(metricName); + if (prev != null) { + final double rate = prev.computeRate(metric); + event.put(metricDefinition.getValueField(), rate); + collector.collect(event.toString(), event); + } else { + CounterValue current = new CounterValue(metric); + cache.put(metricName, current); + } + } else { + collector.collect(event.toString(), event); + } + + } + + @Override + public void close() { + cache.clear(); + } + + private Metric toMetric(Map event) { + + String metricName = ""; + for (String dimensionField : metricDefinition.getDimensionFields()) { + metricName += event.get(dimensionField) + "-"; + } + metricName += metricDefinition.getNameSelector().getMetricName(event); + + long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event); + + return new Metric(metricName, timestamp, getCurrentValue(event)); + } + + private double getCurrentValue(Map event) { + double[] values; + if (event.containsKey(metricDefinition.getValueField())) { + values = new double[]{(double) event.get(metricDefinition.getValueField())}; + } else { + LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event); + values = new double[]{0}; + } + return values[0]; + } + + protected static class CounterValue { + private long timestamp; + private double value; + + public CounterValue(long timestamp, double value) { + this.timestamp = timestamp; + this.value = value; + } + + public CounterValue(Metric m) { + this(m.getTimestamp(), m.getNumberValue().doubleValue()); + } + + public long getTimestamp() { + return timestamp; + } + + public double computeRate(Metric m) { + final long currentTimestamp = m.getTimestamp(); + final double currentValue = m.getNumberValue().doubleValue(); + + final long durationMillis = currentTimestamp - timestamp; + final double delta = currentValue - value; + + timestamp = currentTimestamp; + value = currentValue; + + return computeRate(durationMillis, delta); + } + + private double computeRate(long durationMillis, double delta) { + final double millisPerSecond = 1000.0; + final double duration = durationMillis / millisPerSecond; + return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration; + } + + @Override + public String toString() { + return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}'; + } + } + + + protected final class Metric { + private final String metricName; + private final long timestamp; + private final Object value; + + public Metric(String metricName, long timestamp, Object value) { + this.metricName = Preconditions.checkNotNull(metricName, "metricName"); + this.timestamp = timestamp; + this.value = Preconditions.checkNotNull(value, "value"); + } + + public String getMetricName() { + return metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public Object getValue() { + return value; + } + + public Number getNumberValue() { + return (Number) value; + } + + public boolean hasNumberValue() { + return (value instanceof Number); + } + + public boolean isCounter() { + return metricName.endsWith("count"); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof Metric)) { + return false; + } + Metric m = (Metric) obj; + return metricName.equals(m.getMetricName()) + && timestamp == m.getTimestamp() + && value.equals(m.getValue()); + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + value.hashCode(); + return result; + } + + + @Override + public String toString() { + return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}'; + } + } +} diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java new file mode 100644 index 0000000000..f3deff99dd --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/Clock.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils; + +public interface Clock { + Clock WALL = System::currentTimeMillis; + + long now(); +} \ No newline at end of file diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java new file mode 100644 index 0000000000..62b060f686 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ClockWithOffset.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils; + +public enum ClockWithOffset implements Clock { + INSTANCE; + + private volatile long offset = 0L; + + public void setOffset(long offset) { + if (offset >= 0) { + this.offset = offset; + } + } + + @Override + public long now() { + return offset + System.currentTimeMillis(); + } +} diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java new file mode 100644 index 0000000000..cd8fc8097a --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/ManualClock.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.utils; + +import java.util.concurrent.atomic.AtomicLong; + +public class ManualClock implements Clock { + + private final AtomicLong time; + + public ManualClock(long init) { + time = new AtomicLong(init); + } + + public void set(long t) { + time.set(t); + } + + public long now() { + return time.get(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ManualClock clock = (ManualClock) o; + return now() == clock.now(); + } + + @Override + public int hashCode() { + return Long.valueOf(now()).hashCode(); + } +} \ No newline at end of file diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java new file mode 100644 index 0000000000..1176e33b08 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.app.utils.ClockWithOffset; +import org.apache.eagle.app.utils.ManualClock; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class CounterToRateFunctionTest { + + + private Map mkCountTypeEvent(long ts, double value) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", "hadoop.hbase.regionserver.server.totalrequestcount"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + private Map mkCountTypeEventWithMetricName(long ts, double value, String metric) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", metric); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + private Map mkOtherTypeEvent(long ts, double value) { + Map event = new HashMap(); + event.put("timestamp", ts); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", value); + event.put("host", "xxx-xxx.int.xxx.com"); + return event; + } + + + @Test + public void testToMetricAndCounterValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + long baseTime = System.currentTimeMillis() + 100000L; + + MetricDefinition metricDefinition = MetricDefinition + .metricType("HADOOP_JMX_METRICS") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); + + Map event = mkCountTypeEvent((baseTime + 0), 374042741.0); + Method toMetricMethod = counterToRateFunction.getClass().getDeclaredMethod("toMetric", Map.class); + toMetricMethod.setAccessible(true); + CounterToRateFunction.Metric metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction, event); + Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount", metric.getMetricName()); + Assert.assertEquals(374042741.0, Double.valueOf(metric.getValue().toString()), 0.00001); + Assert.assertEquals(374042741.0, metric.getNumberValue().doubleValue(), 0.00001); + Assert.assertTrue(metric.isCounter()); + + + event = mkOtherTypeEvent((baseTime + 0), 100); + metric = (CounterToRateFunction.Metric) toMetricMethod.invoke(counterToRateFunction, event); + Assert.assertEquals("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.memory.heapmemoryusage.used", metric.getMetricName()); + Assert.assertEquals(100, Double.valueOf(metric.getValue().toString()), 0.00001); + Assert.assertEquals(100, metric.getNumberValue().doubleValue(), 0.00001); + Assert.assertTrue(!metric.isCounter()); + + + } + + @Test + public void testTransformToRate() throws NoSuchFieldException, IllegalAccessException { + List result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List emit(String streamId, Collection anchors, List tuple) { + result.add((Map) tuple.get(1)); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + MetricDefinition metricDefinition = MetricDefinition + .metricType("HADOOP_JMX_METRICS") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); + counterToRateFunction.open(new StormOutputCollector(collector)); + long baseTime = System.currentTimeMillis() + 100000L; + //put first count sample + Map event = mkCountTypeEvent((baseTime + 0), 374042741.0); + counterToRateFunction.transform(event); + Assert.assertTrue(result.isEmpty()); + + Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache"); + cacheField.setAccessible(true); + Map cache = (Map) cacheField.get(counterToRateFunction); + Assert.assertTrue(cache.size() == 1); + + CounterToRateFunction.CounterValue counterValue = cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount"); + Assert.assertEquals((long) event.get("timestamp"), counterValue.getTimestamp()); + Field valueField = counterValue.getClass().getDeclaredField("value"); + valueField.setAccessible(true); + double value = (double) valueField.get(counterValue); + Assert.assertEquals(374042741.0, value, 0.00001); + result.clear(); + //put not count sample + event = mkOtherTypeEvent((baseTime + 0), 100); + counterToRateFunction.transform(event); + Assert.assertTrue(result.size() == 1); + Assert.assertTrue(cache.size() == 1); + Assert.assertEquals(baseTime + 0, counterValue.getTimestamp()); + Assert.assertEquals(374042741.0, value, 0.00001); + + Assert.assertEquals("hadoop.memory.heapmemoryusage.used", event.get("metric")); + Assert.assertEquals(100, (Double) event.get("value"), 0.00001); + result.clear(); + + //delta of 10 in 5 seconds + event = mkCountTypeEvent((baseTime + 5000), 374042751.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + Map transedEvent = result.get(0); + Assert.assertEquals(baseTime + 5000, transedEvent.get("timestamp")); + Assert.assertEquals(2.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 5000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042751.0, value, 0.00001); + result.clear(); + + //delta of 15 in 5 seconds + event = mkCountTypeEvent((baseTime + 10000), 374042766.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 10000, transedEvent.get("timestamp")); + Assert.assertEquals(3.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 10000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042766.0, value, 0.00001); + result.clear(); + + + //No change from previous sample + event = mkCountTypeEvent((baseTime + 15000), 374042766.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 15000, transedEvent.get("timestamp")); + Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 15000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(374042766.0, value, 0.00001); + result.clear(); + + //Decrease from previous sample + event = mkCountTypeEvent((baseTime + 20000), 1.0); + counterToRateFunction.transform(event); + + Assert.assertTrue(result.size() == 1); + transedEvent = result.get(0); + Assert.assertEquals(baseTime + 20000, transedEvent.get("timestamp")); + Assert.assertEquals(0.0, (double) transedEvent.get("value"), 0.00001); + Assert.assertEquals(baseTime + 20000, counterValue.getTimestamp()); + value = (double) valueField.get(counterValue); + Assert.assertEquals(1.0, value, 0.00001); + result.clear(); + } + + @Test + public void testTransformToRateWithExpiration() throws NoSuchFieldException, IllegalAccessException { + + MetricDefinition metricDefinition = MetricDefinition + .metricType("HADOOP_JMX_METRICS") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + List result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List emit(String streamId, Collection anchors, List tuple) { + result.add((Map) tuple.get(1)); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + ManualClock manualClock = new ManualClock(0); + manualClock.set(30000L); + CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 60, TimeUnit.SECONDS, manualClock); + counterToRateFunction.open(new StormOutputCollector(collector)); + Map event = mkCountTypeEventWithMetricName(manualClock.now(), 110, "hadoop.hbase.regionserver.server.totalrequestcount"); + counterToRateFunction.transform(event); + Field cacheField = counterToRateFunction.getClass().getDeclaredField("cache"); + cacheField.setAccessible(true); + Map cache = (Map) cacheField.get(counterToRateFunction); + Assert.assertTrue(cache.size() == 1); + + manualClock.set(50000L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 130, "hadoop.hbase.regionserver.server.readerrequestcount"); + counterToRateFunction.transform(event); + + cache = (Map) cacheField.get(counterToRateFunction); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=30000, value=110.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + manualClock.set(100000L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 120, "hadoop.hbase.regionserver.server.totalrequestcount"); + counterToRateFunction.transform(event); + + cache = (Map) cacheField.get(counterToRateFunction); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=100000, value=120.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.totalrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + manualClock.set(160001L); + event = mkCountTypeEventWithMetricName(manualClock.now(), 10, "hadoop.hbase.regionserver.server.writerrequestcount"); + counterToRateFunction.transform(event); + Assert.assertEquals(2, cache.size()); + Assert.assertEquals("CounterValue{timestamp=160001, value=10.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.writerrequestcount").toString()); + Assert.assertEquals("CounterValue{timestamp=50000, value=130.0}", cache.get("xxx-xxx.int.xxx.com-hbasemaster-sandbox-hadoop.hbase.regionserver.server.readerrequestcount").toString()); + + + } +} diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java new file mode 100644 index 0000000000..23cbe48ab9 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.messaging; + +import backtype.storm.Testing; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import com.typesafe.config.Config; +import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.utils.StreamConvertHelper; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.lang.reflect.Field; +import java.util.*; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({MetricStreamPersist.class}) +public class MetricStreamPersistTest { + + @Test + public void testStructuredMetricMapper() throws Exception { + MetricDefinition metricDefinition = MetricDefinition + .metricType("HADOOP_JMX_METRICS") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + Config config = mock(Config.class); + MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition, config); + Field mapperField = metricStreamPersist.getClass().getDeclaredField("mapper"); + mapperField.setAccessible(true); + + Map event = new HashMap(); + event.put("timestamp", 1482106479564L); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", 14460904.0); + event.put("host", "xxx-xxx.int.xxx.com"); + + Tuple tuple = Testing.testTuple(new Values("metric", event)); + MetricStreamPersist.MetricMapper mapper= (MetricStreamPersist.MetricMapper) mapperField.get(metricStreamPersist); + + GenericMetricEntity metricEntity = mapper.map(StreamConvertHelper.tupleToEvent(tuple).f1()); + + Assert.assertEquals("prefix:hadoop.memory.heapmemoryusage.used, timestamp:1482106440000, humanReadableDate:2016-12-19 00:14:00,000, tags: component=hbasemaster,site=sandbox,host=xxx-xxx.int.xxx.com,, encodedRowkey:null", metricEntity.toString()); + } + + @Test + public void testMetricStreamPersist() throws Exception { + List result = new ArrayList<>(); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List emit(String streamId, Collection anchors, List tuple) { + result.add(String.valueOf(tuple.get(0))); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { + + } + + @Override + public void ack(Tuple input) { + + } + + @Override + public void fail(Tuple input) { + + } + + @Override + public void reportError(Throwable error) { + + } + }); + + MetricDefinition metricDefinition = MetricDefinition + .metricType("HADOOP_JMX_METRICS") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); + Config config = mock(Config.class); + when(config.hasPath("service.batchSize")).thenReturn(false); + + GenericServiceAPIResponseEntity response = mock(GenericServiceAPIResponseEntity.class); + when(response.isSuccess()).thenReturn(true); + + EagleServiceClientImpl client = mock(EagleServiceClientImpl.class); + PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(config).thenReturn(client); + when(client.create(anyObject())).thenReturn(response); + + MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition, config); + metricStreamPersist.prepare(null, null, collector); + Map event = new HashMap(); + event.put("timestamp", 1482106479564L); + event.put("metric", "hadoop.memory.heapmemoryusage.used"); + event.put("component", "hbasemaster"); + event.put("site", "sandbox"); + event.put("value", 14460904.0); + event.put("host", "xxx-xxx.int.xxx.com"); + + Tuple tuple = Testing.testTuple(new Values("metric", event)); + metricStreamPersist.execute(tuple); + Assert.assertTrue(result.size() == 1); + Assert.assertEquals("hadoop.memory.heapmemoryusage.used", result.get(0)); + } +} + diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java index 5aa27a3b9b..c6b9f870f9 100644 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -19,23 +19,30 @@ import backtype.storm.generated.StormTopology; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.builder.CounterToRateFunction; import org.apache.eagle.app.environment.builder.MetricDefinition; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.utils.ClockWithOffset; import java.util.Calendar; +import java.util.concurrent.TimeUnit; public class HadoopMetricMonitorApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { - return environment.newApp(config) - .fromStream("HADOOP_JMX_METRIC_STREAM") - .saveAsMetric(MetricDefinition + + MetricDefinition metricDefinition = MetricDefinition .metricType("HADOOP_JMX_METRICS") .namedByField("metric") .eventTimeByField("timestamp") - .dimensionFields("host","component","site") + .dimensionFields("host", "component", "site") .granularity(Calendar.MINUTE) - .valueField("value")) - .toTopology(); + .valueField("value"); + + return environment.newApp(config) + .fromStream("HADOOP_JMX_METRIC_STREAM") + .transformBy(new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE)) + .saveAsMetric(metricDefinition) + .toTopology(); } } \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index f4bd2fa140..417611e31a 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -65,12 +65,10 @@ public class MRRunningJobApplicationTest { private static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]"; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); private static Config config = ConfigFactory.load(); - private static String siteId; @BeforeClass public static void setupMapper() throws Exception { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); - siteId = config.getString("siteId"); } diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java index 4c52e10cd3..eb48ff66d3 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobManagerTest.java @@ -24,7 +24,6 @@ import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; -import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.jobrecover.RunningJobManager; import org.apache.zookeeper.CreateMode; import org.junit.*; @@ -37,8 +36,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java index 0ec1b8e4f4..7ffd30fbd5 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java @@ -21,8 +21,6 @@ import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.Utils; -import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; -import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils; import org.apache.eagle.jpm.util.resourcefetch.model.*; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import com.fasterxml.jackson.core.JsonParser; From ee3adb56775d725882d0852f25e68f38f3e0a6fb Mon Sep 17 00:00:00 2001 From: r7raul1984 Date: Mon, 20 Mar 2017 01:06:08 +0000 Subject: [PATCH 2/5] Fix Conflicts: eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java --- .../builder/CounterToRateFunction.java | 26 ++++----- .../builder/CounterToRateFunctionTest.java | 47 ++++++++-------- .../messaging/MetricStreamPersistTest.java | 36 ++++++------ .../eagle/metric/HadoopMetricMonitorApp.java | 56 ++++++++++--------- 4 files changed, 88 insertions(+), 77 deletions(-) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java index 6712c9046e..9ae250103b 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -29,10 +29,10 @@ public class CounterToRateFunction implements TransformFunction { private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class); private final Map cache; - private MetricDefinition metricDefinition; + private MetricDescriptor metricDescriptor; private Collector collector; - public CounterToRateFunction(MetricDefinition metricDefinition, long heartbeat, TimeUnit unit, final Clock clock) { + public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) { final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit); this.cache = new LinkedHashMap(16, 0.75f, true) { protected boolean removeEldestEntry(Map.Entry eldest) { @@ -45,7 +45,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { return expired; } }; - this.metricDefinition = metricDefinition; + this.metricDescriptor = metricDescriptor; } @Override @@ -67,7 +67,7 @@ public void transform(Map event) { final CounterValue prev = cache.get(metricName); if (prev != null) { final double rate = prev.computeRate(metric); - event.put(metricDefinition.getValueField(), rate); + event.put(metricDescriptor.getValueField(), rate); collector.collect(event.toString(), event); } else { CounterValue current = new CounterValue(metric); @@ -87,22 +87,22 @@ public void close() { private Metric toMetric(Map event) { String metricName = ""; - for (String dimensionField : metricDefinition.getDimensionFields()) { + for (String dimensionField : metricDescriptor.getDimensionFields()) { metricName += event.get(dimensionField) + "-"; } - metricName += metricDefinition.getNameSelector().getMetricName(event); + metricName += metricDescriptor.getMetricNameSelector().getMetricName(event); - long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event); + long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event); return new Metric(metricName, timestamp, getCurrentValue(event)); } private double getCurrentValue(Map event) { double[] values; - if (event.containsKey(metricDefinition.getValueField())) { - values = new double[]{(double) event.get(metricDefinition.getValueField())}; + if (event.containsKey(metricDescriptor.getValueField())) { + values = new double[]{(double) event.get(metricDescriptor.getValueField())}; } else { - LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event); + LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); values = new double[]{0}; } return values[0]; @@ -193,8 +193,8 @@ public boolean equals(Object obj) { } Metric m = (Metric) obj; return metricName.equals(m.getMetricName()) - && timestamp == m.getTimestamp() - && value.equals(m.getValue()); + && timestamp == m.getTimestamp() + && value.equals(m.getValue()); } @Override @@ -211,4 +211,4 @@ public String toString() { return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}'; } } -} +} \ No newline at end of file diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java index 1176e33b08..6c00880821 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/environment/builder/CounterToRateFunctionTest.java @@ -72,13 +72,14 @@ private Map mkOtherTypeEvent(long ts, double value) { public void testToMetricAndCounterValue() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { long baseTime = System.currentTimeMillis() + 100000L; - MetricDefinition metricDefinition = MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) - .valueField("value"); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); Map event = mkCountTypeEvent((baseTime + 0), 374042741.0); @@ -131,13 +132,14 @@ public void reportError(Throwable error) { } }); - MetricDefinition metricDefinition = MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) - .valueField("value"); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); CounterToRateFunction counterToRateFunction = new CounterToRateFunction(metricDefinition, 3, TimeUnit.MINUTES, ClockWithOffset.INSTANCE); counterToRateFunction.open(new StormOutputCollector(collector)); long baseTime = System.currentTimeMillis() + 100000L; @@ -227,13 +229,14 @@ public void reportError(Throwable error) { @Test public void testTransformToRateWithExpiration() throws NoSuchFieldException, IllegalAccessException { - MetricDefinition metricDefinition = MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) - .valueField("value"); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); List result = new ArrayList<>(); OutputCollector collector = new OutputCollector(new IOutputCollector() { @Override @@ -300,4 +303,4 @@ public void reportError(Throwable error) { } -} +} \ No newline at end of file diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java index 23cbe48ab9..1561a411d6 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/messaging/MetricStreamPersistTest.java @@ -22,7 +22,7 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import com.typesafe.config.Config; -import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.app.utils.StreamConvertHelper; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; @@ -42,18 +42,19 @@ import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest({MetricStreamPersist.class}) +@PrepareForTest( {MetricStreamPersist.class}) public class MetricStreamPersistTest { @Test public void testStructuredMetricMapper() throws Exception { - MetricDefinition metricDefinition = MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) - .valueField("value"); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); Config config = mock(Config.class); MetricStreamPersist metricStreamPersist = new MetricStreamPersist(metricDefinition, config); Field mapperField = metricStreamPersist.getClass().getDeclaredField("mapper"); @@ -68,7 +69,7 @@ public void testStructuredMetricMapper() throws Exception { event.put("host", "xxx-xxx.int.xxx.com"); Tuple tuple = Testing.testTuple(new Values("metric", event)); - MetricStreamPersist.MetricMapper mapper= (MetricStreamPersist.MetricMapper) mapperField.get(metricStreamPersist); + MetricStreamPersist.MetricMapper mapper = (MetricStreamPersist.MetricMapper) mapperField.get(metricStreamPersist); GenericMetricEntity metricEntity = mapper.map(StreamConvertHelper.tupleToEvent(tuple).f1()); @@ -106,13 +107,14 @@ public void reportError(Throwable error) { } }); - MetricDefinition metricDefinition = MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.MINUTE) - .valueField("value"); + MetricDescriptor metricDefinition = MetricDescriptor + .metricGroupByField("group") + .siteAs("siteId") + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value"); Config config = mock(Config.class); when(config.hasPath("service.batchSize")).thenReturn(false); diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java index 05c874dfc0..9c0ac100b4 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java +++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -19,41 +19,47 @@ import backtype.storm.generated.StormTopology; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.builder.CounterToRateFunction; import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.utils.AppConfigUtils; +import org.apache.eagle.app.utils.ClockWithOffset; import java.util.Calendar; +import java.util.concurrent.TimeUnit; public class HadoopMetricMonitorApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { + + MetricDescriptor hadoopMetricDescriptor = MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> { + if (event.containsKey("component")) { + return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase()); + } else { + return "hadoop.metrics"; + } + }) + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.SECOND) + .valueField("value"); + + MetricDescriptor systemMetricDescriptor = MetricDescriptor.metricGroupByField("group") + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "group", "site", "device") + .granularity(Calendar.SECOND) + .valueField("value"); return environment.newApp(config) - .fromStream("HADOOP_JMX_METRIC_STREAM") - .saveAsMetric( - MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> { - if (event.containsKey("component")) { - return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase()); - } else { - return "hadoop.metrics"; - } - }) - .siteAs(AppConfigUtils.getSiteId(config)) - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "component", "site") - .granularity(Calendar.SECOND) - .valueField("value")) - .fromStream("SYSTEM_METRIC_STREAM") - .saveAsMetric(MetricDescriptor.metricGroupByField("group") - .siteAs(AppConfigUtils.getSiteId(config)) - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host", "group", "site", "device") - .granularity(Calendar.SECOND) - .valueField("value") - ) - .toTopology(); + .fromStream("HADOOP_JMX_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3, TimeUnit.SECONDS, ClockWithOffset.INSTANCE)) + .saveAsMetric(hadoopMetricDescriptor) + .fromStream("SYSTEM_METRIC_STREAM").transformBy(new CounterToRateFunction(hadoopMetricDescriptor,3, TimeUnit.SECONDS, ClockWithOffset.INSTANCE)) + .saveAsMetric(systemMetricDescriptor + ) + .toTopology(); } } \ No newline at end of file From 177c15f7c079478b29900cec06ddfbe74177ca3d Mon Sep 17 00:00:00 2001 From: r7raul1984 Date: Mon, 20 Mar 2017 01:34:14 +0000 Subject: [PATCH 3/5] Add CountMetricFilter interface. --- .../builder/CountMetricFilter.java | 26 +++++++++++++++++++ .../builder/CounterToRateFunction.java | 14 +++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java new file mode 100644 index 0000000000..bed047b3ec --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CountMetricFilter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.environment.builder; + +import java.io.Serializable; +import java.util.function.Function; + +@FunctionalInterface +interface CountMetricFilter extends Function, Serializable { +} \ No newline at end of file diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java index 9ae250103b..8a15ef812b 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -62,7 +62,7 @@ public void open(Collector collector) { public void transform(Map event) { Metric metric = toMetric(event); LOG.debug("received {} metrics", metric); - if (metric.isCounter()) { + if (new DefaultCountMetricFilter().apply(metric.getMetricName())) { final String metricName = metric.getMetricName(); final CounterValue prev = cache.get(metricName); if (prev != null) { @@ -100,10 +100,10 @@ private Metric toMetric(Map event) { private double getCurrentValue(Map event) { double[] values; if (event.containsKey(metricDescriptor.getValueField())) { - values = new double[]{(double) event.get(metricDescriptor.getValueField())}; + values = new double[] {(double) event.get(metricDescriptor.getValueField())}; } else { LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); - values = new double[]{0}; + values = new double[] {0}; } return values[0]; } @@ -205,10 +205,16 @@ public int hashCode() { return result; } - @Override public String toString() { return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}'; } } + + private class DefaultCountMetricFilter implements CountMetricFilter { + @Override + public Boolean apply(String metricName) { + return metricName.endsWith("*.count"); + } + } } \ No newline at end of file From e006f66f857d421dde00be13d545353a6425ddcf Mon Sep 17 00:00:00 2001 From: r7raul1984 Date: Mon, 20 Mar 2017 02:39:02 +0000 Subject: [PATCH 4/5] Use isCounter again. --- .../eagle/app/environment/builder/CounterToRateFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java index 8a15ef812b..c8ae61990e 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -62,7 +62,7 @@ public void open(Collector collector) { public void transform(Map event) { Metric metric = toMetric(event); LOG.debug("received {} metrics", metric); - if (new DefaultCountMetricFilter().apply(metric.getMetricName())) { + if (metric.isCounter()) { final String metricName = metric.getMetricName(); final CounterValue prev = cache.get(metricName); if (prev != null) { From b9f85390f10542a6cc0c8584c3ed492c3816851c Mon Sep 17 00:00:00 2001 From: r7raul1984 Date: Tue, 21 Mar 2017 00:14:24 +0000 Subject: [PATCH 5/5] Fix bug change metricName.endsWith("*.count") to endsWith("count") --- .../eagle/app/environment/builder/CounterToRateFunction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java index c8ae61990e..51dad41949 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/CounterToRateFunction.java @@ -62,7 +62,7 @@ public void open(Collector collector) { public void transform(Map event) { Metric metric = toMetric(event); LOG.debug("received {} metrics", metric); - if (metric.isCounter()) { + if (new DefaultCountMetricFilter().apply(metric.getMetricName())) { final String metricName = metric.getMetricName(); final CounterValue prev = cache.get(metricName); if (prev != null) { @@ -214,7 +214,7 @@ public String toString() { private class DefaultCountMetricFilter implements CountMetricFilter { @Override public Boolean apply(String metricName) { - return metricName.endsWith("*.count"); + return metricName.endsWith("count"); } } } \ No newline at end of file