Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
[EAGLE-872] Transform counter metric to rate metric
Browse files Browse the repository at this point in the history
- Add transform bolt using counterToRateFunction in HadoopMetricMonitorApp's storm topology.

https://issues.apache.org/jira/browse/EAGLE-872

Author: r7raul1984 <tangjijun@yhd.com>

Closes #783 from r7raul1984/EAGLE-872.
  • Loading branch information
r7raul1984 committed Mar 21, 2017
1 parent 0cda01b commit 673a81e
Show file tree
Hide file tree
Showing 11 changed files with 840 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eagle.app.environment.builder;

import java.io.Serializable;
import java.util.function.Function;

@FunctionalInterface
interface CountMetricFilter extends Function<String, Boolean>, Serializable {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eagle.app.environment.builder;

import com.google.common.base.Preconditions;
import org.apache.eagle.app.utils.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class CounterToRateFunction implements TransformFunction {
private static final Logger LOG = LoggerFactory.getLogger(CounterToRateFunction.class);
private final Map<String, CounterValue> cache;
private MetricDescriptor metricDescriptor;
private Collector collector;

public CounterToRateFunction(MetricDescriptor metricDescriptor, long heartbeat, TimeUnit unit, final Clock clock) {
final long heartbeatMillis = TimeUnit.MILLISECONDS.convert(heartbeat, unit);
this.cache = new LinkedHashMap<String, CounterValue>(16, 0.75f, true) {
protected boolean removeEldestEntry(Map.Entry<String, CounterValue> eldest) {
final long now = clock.now();
final long lastMod = eldest.getValue().getTimestamp();
final boolean expired = (now - lastMod) > heartbeatMillis;
if (expired) {
LOG.debug("heartbeat interval exceeded, expiring {}", eldest.getKey());
}
return expired;
}
};
this.metricDescriptor = metricDescriptor;
}

@Override
public String getName() {
return "CounterToRate";
}

@Override
public void open(Collector collector) {
this.collector = collector;
}

@Override
public void transform(Map event) {
Metric metric = toMetric(event);
LOG.debug("received {} metrics", metric);
if (new DefaultCountMetricFilter().apply(metric.getMetricName())) {
final String metricName = metric.getMetricName();
final CounterValue prev = cache.get(metricName);
if (prev != null) {
final double rate = prev.computeRate(metric);
event.put(metricDescriptor.getValueField(), rate);
collector.collect(event.toString(), event);
} else {
CounterValue current = new CounterValue(metric);
cache.put(metricName, current);
}
} else {
collector.collect(event.toString(), event);
}

}

@Override
public void close() {
cache.clear();
}

private Metric toMetric(Map event) {

String metricName = "";
for (String dimensionField : metricDescriptor.getDimensionFields()) {
metricName += event.get(dimensionField) + "-";
}
metricName += metricDescriptor.getMetricNameSelector().getMetricName(event);

long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);

return new Metric(metricName, timestamp, getCurrentValue(event));
}

private double getCurrentValue(Map event) {
double[] values;
if (event.containsKey(metricDescriptor.getValueField())) {
values = new double[] {(double) event.get(metricDescriptor.getValueField())};
} else {
LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
values = new double[] {0};
}
return values[0];
}

protected static class CounterValue {
private long timestamp;
private double value;

public CounterValue(long timestamp, double value) {
this.timestamp = timestamp;
this.value = value;
}

public CounterValue(Metric m) {
this(m.getTimestamp(), m.getNumberValue().doubleValue());
}

public long getTimestamp() {
return timestamp;
}

public double computeRate(Metric m) {
final long currentTimestamp = m.getTimestamp();
final double currentValue = m.getNumberValue().doubleValue();

final long durationMillis = currentTimestamp - timestamp;
final double delta = currentValue - value;

timestamp = currentTimestamp;
value = currentValue;

return computeRate(durationMillis, delta);
}

private double computeRate(long durationMillis, double delta) {
final double millisPerSecond = 1000.0;
final double duration = durationMillis / millisPerSecond;
return (duration <= 0.0 || delta <= 0.0) ? 0.0 : delta / duration;
}

@Override
public String toString() {
return "CounterValue{" + "timestamp=" + timestamp + ", value=" + value + '}';
}
}


protected final class Metric {
private final String metricName;
private final long timestamp;
private final Object value;

public Metric(String metricName, long timestamp, Object value) {
this.metricName = Preconditions.checkNotNull(metricName, "metricName");
this.timestamp = timestamp;
this.value = Preconditions.checkNotNull(value, "value");
}

public String getMetricName() {
return metricName;
}

public long getTimestamp() {
return timestamp;
}

public Object getValue() {
return value;
}

public Number getNumberValue() {
return (Number) value;
}

public boolean hasNumberValue() {
return (value instanceof Number);
}

public boolean isCounter() {
return metricName.endsWith("count");
}

@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof Metric)) {
return false;
}
Metric m = (Metric) obj;
return metricName.equals(m.getMetricName())
&& timestamp == m.getTimestamp()
&& value.equals(m.getValue());
}

@Override
public int hashCode() {
int result = metricName.hashCode();
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + value.hashCode();
return result;
}

@Override
public String toString() {
return "Metric{metricName=" + metricName + ", timestamp=" + timestamp + ", value=" + value + '}';
}
}

private class DefaultCountMetricFilter implements CountMetricFilter {
@Override
public Boolean apply(String metricName) {
return metricName.endsWith("count");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eagle.app.utils;

public interface Clock {
Clock WALL = System::currentTimeMillis;

long now();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eagle.app.utils;

public enum ClockWithOffset implements Clock {
INSTANCE;

private volatile long offset = 0L;

public void setOffset(long offset) {
if (offset >= 0) {
this.offset = offset;
}
}

@Override
public long now() {
return offset + System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.app.utils;

import java.util.concurrent.atomic.AtomicLong;

public class ManualClock implements Clock {

private final AtomicLong time;

public ManualClock(long init) {
time = new AtomicLong(init);
}

public void set(long t) {
time.set(t);
}

public long now() {
return time.get();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ManualClock clock = (ManualClock) o;
return now() == clock.now();
}

@Override
public int hashCode() {
return Long.valueOf(now()).hashCode();
}
}

0 comments on commit 673a81e

Please sign in to comment.