Skip to content

Commit

Permalink
expose stream throughput as a per-second rate via metrics
Browse files Browse the repository at this point in the history
 this way the web interface can get them via the bulk metrics call

issue #1071
  • Loading branch information
kroepke committed Apr 1, 2015
1 parent f108745 commit 0261329
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 47 deletions.
Expand Up @@ -28,10 +28,11 @@ public final class GlobalMetricNames {


private GlobalMetricNames() {} private GlobalMetricNames() {}


public static final String RATE_SUFFIX = "1-sec-rate";

public static final String INPUT_THROUGHPUT = "org.graylog2.throughput.input"; public static final String INPUT_THROUGHPUT = "org.graylog2.throughput.input";
public static final String INPUT_THROUGHPUT_RATE = name(INPUT_THROUGHPUT, "1-sec-rate");


public static final String OUTPUT_THROUGHPUT = "org.graylog2.throughput.output"; public static final String OUTPUT_THROUGHPUT = "org.graylog2.throughput.output";
public static final String OUTPUT_THROUGHPUT_RATE = name(OUTPUT_THROUGHPUT, "1-sec-rate"); public static final String OUTPUT_THROUGHPUT_RATE = name(OUTPUT_THROUGHPUT, RATE_SUFFIX);


} }
@@ -1,65 +1,51 @@
/** /**
* This file is part of Graylog. * This file is part of Graylog.
* * <p/>
* Graylog is free software: you can redistribute it and/or modify * Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or * the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version. * (at your option) any later version.
* * <p/>
* Graylog is distributed in the hope that it will be useful, * Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details. * GNU General Public License for more details.
* * <p/>
* You should have received a copy of the GNU General Public License * You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>. * along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/ */
package org.graylog2.periodical; package org.graylog2.periodical;


import com.codahale.metrics.Counter; import com.codahale.metrics.Counting;
import com.codahale.metrics.Gauge; import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.graylog2.plugin.GlobalMetricNames; import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.periodical.Periodical; import org.graylog2.plugin.periodical.Periodical;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import javax.inject.Inject; import javax.inject.Inject;
import java.util.Map;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.concurrent.ConcurrentMap;


import static com.codahale.metrics.MetricRegistry.name;
import static org.graylog2.shared.metrics.MetricUtils.filterSingleMetric; import static org.graylog2.shared.metrics.MetricUtils.filterSingleMetric;


public class ThroughputCalculator extends Periodical { public class ThroughputCalculator extends Periodical {
private static final Logger log = LoggerFactory.getLogger(ThroughputCalculator.class); private static final Logger log = LoggerFactory.getLogger(ThroughputCalculator.class);


private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry;
private long prevOutputCount = 0;
private volatile long outputCountAvgLastSecond = 0L; private ConcurrentMap<String, CounterSample> sampledCounters = Maps.newConcurrentMap();
private long prevInputCount = 0;
private volatile long inputCountAvgLastSecond = 0L;


@Inject @Inject
public ThroughputCalculator(MetricRegistry metricRegistry) { public ThroughputCalculator(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry; this.metricRegistry = metricRegistry;
try {
metricRegistry.register(GlobalMetricNames.OUTPUT_THROUGHPUT_RATE, new Gauge<Long>() {
@Override
public Long getValue() {
return outputCountAvgLastSecond;
}
});

metricRegistry.register(GlobalMetricNames.INPUT_THROUGHPUT_RATE, new Gauge<Long>() {
@Override
public Long getValue() {
return inputCountAvgLastSecond;
}
});

} catch (Exception e) {
log.error("Unable to register metric", e);
}
} }


@Override @Override
Expand Down Expand Up @@ -104,31 +90,69 @@ protected Logger getLogger() {


@Override @Override
public void doRun() { public void doRun() {
final SortedMap<String, Counter> counters = metricRegistry.getCounters( final SortedMap<String, ? extends Counting> counters = metricRegistry.getCounters(
filterSingleMetric(GlobalMetricNames.OUTPUT_THROUGHPUT) filterSingleMetric(GlobalMetricNames.OUTPUT_THROUGHPUT)
); );

final Counter outputThroughput = Iterables.getOnlyElement(counters.values(), null);
if (outputThroughput == null) {
log.debug("Could not find throughput meter '{}'. Trying again later.", GlobalMetricNames.OUTPUT_THROUGHPUT);
} else {
final long currentOutputThroughput = outputThroughput.getCount();
outputCountAvgLastSecond = currentOutputThroughput - prevOutputCount;
prevOutputCount = currentOutputThroughput;
}

// rinse and repeat for input throughput // rinse and repeat for input throughput
final SortedMap<String, Counter> inputCounters = metricRegistry.getCounters( final SortedMap<String, ? extends Counting> inputCounters = metricRegistry.getCounters(
filterSingleMetric(GlobalMetricNames.INPUT_THROUGHPUT) filterSingleMetric(GlobalMetricNames.INPUT_THROUGHPUT)
); );


final Counter inputThroughput = Iterables.getOnlyElement(inputCounters.values(), null); // StreamMetrics isn't accessible here, so we need to use a metrics filter instead.
if (inputThroughput == null) { final SortedMap<String, ? extends Counting> streamMeters = metricRegistry.getMeters(new MetricFilter() {
log.debug("Could not find throughput meter '{}'. Trying again later.", GlobalMetricNames.INPUT_THROUGHPUT); @Override
} else { public boolean matches(String name, Metric metric) {
final long currentInputThroughput = inputThroughput.getCount(); return name.matches("org\\.graylog2\\.plugin\\.streams\\.Stream\\..*?\\.incomingMessages");
inputCountAvgLastSecond = currentInputThroughput - prevInputCount; }
prevInputCount = currentInputThroughput; });

final Iterable<Map.Entry<String, ? extends Counting>> entries = Iterables.concat(counters.entrySet(),
inputCounters.entrySet(),
streamMeters.entrySet());

// calculate rates
for (Map.Entry<String, ? extends Counting> countingEntry : entries) {
final Counting value = countingEntry.getValue();

final String metricName = countingEntry.getKey();
CounterSample counterSample = sampledCounters.get(metricName);
if (counterSample == null) {
counterSample = new CounterSample();
sampledCounters.put(metricName, counterSample);
}
counterSample.updateAverage(value.getCount());
final String rateName = name(metricName, GlobalMetricNames.RATE_SUFFIX);
if (!metricRegistry.getMetrics().containsKey(rateName)) {
try {
log.info("Registering metric {}", rateName);
metricRegistry.register(rateName, new Gauge<Double>() {
@Override
public Double getValue() {
final CounterSample sample = sampledCounters.get(metricName);
return sample == null ? 0d : sample.getCurrentAverage();
}
});
} catch (IllegalArgumentException e) {
log.warn(
"Could not register gauge {} despite checking before that it didn't exist. This should not happen.",
rateName);
}
}
}
}

private static class CounterSample {
private long previousCount = 0L;
private double currentAverage = 0d;

public void updateAverage(long currentCount) {
currentAverage = (double) currentCount - (double) previousCount;

This comment has been minimized.

Copy link
@joschi

joschi Apr 1, 2015

Contributor

How can currentAverage ever become a floating point number when it's always calculated from two long values?

This comment has been minimized.

Copy link
@kroepke

kroepke Apr 1, 2015

Author Member

duh! :)

This comment has been minimized.

Copy link
@kroepke

kroepke Apr 1, 2015

Author Member

Still I'd like to keep it as a double, just adding the explanation for it in another commit:

the denominator is always 1 because we rely on the schedule of the periodical being 1 second. Otherwise we'd need to store timestamps as well, which is overkill for this use case, I think.
To reserve the option to make it more precise, I'd like to keep the doubliness.

previousCount = currentCount;
}

public double getCurrentAverage() {
return currentAverage;
} }
} }

} }

0 comments on commit 0261329

Please sign in to comment.