Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11239
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Apr 11, 2023
2 parents f4acaff + 74ddf69 commit 817c917
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,29 @@ public synchronized MutableQuantiles newQuantiles(String name, String desc,
return ret;
}

/**
* Create a mutable inverse metric that estimates inverse quantiles of a stream of values
* @param name of the metric
* @param desc metric description
* @param sampleName of the metric (e.g., "Ops")
* @param valueName of the metric (e.g., "Rate")
* @param interval rollover interval of estimator in seconds
* @return a new inverse quantile estimator object
* @throws MetricsException if interval is not a positive integer
*/
public synchronized MutableQuantiles newInverseQuantiles(String name, String desc,
String sampleName, String valueName, int interval) {
checkMetricName(name);
if (interval <= 0) {
throw new MetricsException("Interval should be positive. Value passed" +
" is: " + interval);
}
MutableQuantiles ret =
new MutableInverseQuantiles(name, desc, sampleName, valueName, interval);
metricsMap.put(name, ret);
return ret;
}

/**
* Create a mutable metric with stats
* @param name of the metric
Expand Down Expand Up @@ -278,7 +301,7 @@ public MutableRate newRate(String name, String description) {
}

/**
* Create a mutable rate metric (for throughput measurement)
* Create a mutable rate metric (for throughput measurement).
* @param name of the metric
* @param desc description
* @param extended produce extended stat (stdev/min/max etc.) if true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
import java.text.DecimalFormat;
import static org.apache.hadoop.metrics2.lib.Interns.info;

/**
* Watches a stream of long values, maintaining online estimates of specific
* quantiles with provably low error bounds. Inverse quantiles are meant for
* highly accurate low-percentile (e.g. 1st, 5th) metrics.
* InverseQuantiles are used for metrics where higher the value better it is.
* ( eg: data transfer rate ).
* The 1st percentile here corresponds to the 99th inverse percentile metric,
* 5th percentile to 95th and so on.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MutableInverseQuantiles extends MutableQuantiles{

static class InversePercentile extends Quantile {
InversePercentile(double inversePercentile) {
super(inversePercentile/100, inversePercentile/1000);
}
}

@VisibleForTesting
public static final Quantile[] INVERSE_QUANTILES = {new InversePercentile(50),
new InversePercentile(25), new InversePercentile(10),
new InversePercentile(5), new InversePercentile(1)};

/**
* Instantiates a new {@link MutableInverseQuantiles} for a metric that rolls itself
* over on the specified time interval.
*
* @param name of the metric
* @param description long-form textual description of the metric
* @param sampleName type of items in the stream (e.g., "Ops")
* @param valueName type of the values
* @param intervalSecs rollover interval (in seconds) of the estimator
*/
public MutableInverseQuantiles(String name, String description, String sampleName,
String valueName, int intervalSecs) {
super(name, description, sampleName, valueName, intervalSecs);
}

/**
* Sets quantileInfo and estimator.
*
* @param ucName capitalized name of the metric
* @param uvName capitalized type of the values
* @param desc uncapitalized long-form textual description of the metric
* @param lvName uncapitalized type of the values
* @param df Number formatter for inverse percentile value
*/
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat df) {
// Construct the MetricsInfos for inverse quantiles, converting to inverse percentiles
setQuantileInfos(INVERSE_QUANTILES.length);
for (int i = 0; i < INVERSE_QUANTILES.length; i++) {
double inversePercentile = 100 * (1 - INVERSE_QUANTILES[i].quantile);
String nameTemplate = ucName + df.format(inversePercentile) + "thInversePercentile" + uvName;
String descTemplate = df.format(inversePercentile) + " inverse percentile " + lvName
+ " with " + getInterval() + " second interval for " + desc;
addQuantileInfo(i, info(nameTemplate, descTemplate));
}

setEstimator(new SampleQuantiles(INVERSE_QUANTILES));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.metrics2.lib.Interns.info;

import java.text.DecimalFormat;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -52,9 +53,10 @@ public class MutableQuantiles extends MutableMetric {
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };

private final MetricsInfo numInfo;
private final MetricsInfo[] quantileInfos;
private final int interval;
private MetricsInfo numInfo;
private MetricsInfo[] quantileInfos;
private int intervalSecs;
private static DecimalFormat decimalFormat = new DecimalFormat("###.####");

private QuantileEstimator estimator;
private long previousCount = 0;
Expand Down Expand Up @@ -91,26 +93,39 @@ public MutableQuantiles(String name, String description, String sampleName,
String lsName = StringUtils.uncapitalize(sampleName);
String lvName = StringUtils.uncapitalize(valueName);

numInfo = info(ucName + "Num" + usName, String.format(
"Number of %s for %s with %ds interval", lsName, desc, interval));
setInterval(interval);
setNumInfo(info(ucName + "Num" + usName, String.format(
"Number of %s for %s with %ds interval", lsName, desc, interval)));
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
setQuantiles(ucName, uvName, desc, lvName, decimalFormat);
}

/**
* Sets quantileInfo and estimator.
*
* @param ucName capitalized name of the metric
* @param uvName capitalized type of the values
* @param desc uncapitalized long-form textual description of the metric
* @param lvName uncapitalized type of the values
* @param pDecimalFormat Number formatter for percentile value
*/
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat pDecimalFormat) {
// Construct the MetricsInfos for the quantiles, converting to percentiles
quantileInfos = new MetricsInfo[quantiles.length];
String nameTemplate = ucName + "%dthPercentile" + uvName;
String descTemplate = "%d percentile " + lvName + " with " + interval
+ " second interval for " + desc;
setQuantileInfos(quantiles.length);
for (int i = 0; i < quantiles.length; i++) {
int percentile = (int) (100 * quantiles[i].quantile);
quantileInfos[i] = info(String.format(nameTemplate, percentile),
String.format(descTemplate, percentile));
double percentile = 100 * quantiles[i].quantile;
String nameTemplate = ucName + pDecimalFormat.format(percentile) + "thPercentile" + uvName;
String descTemplate = pDecimalFormat.format(percentile) + " percentile " + lvName
+ " with " + getInterval() + " second interval for " + desc;
addQuantileInfo(i, info(nameTemplate, descTemplate));
}

estimator = new SampleQuantiles(quantiles);

this.interval = interval;
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
setEstimator(new SampleQuantiles(quantiles));
}

public MutableQuantiles() {}

@Override
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
Expand All @@ -133,8 +148,50 @@ public synchronized void add(long value) {
estimator.insert(value);
}

public int getInterval() {
return interval;
/**
* Set info about the metrics.
*
* @param pNumInfo info about the metrics.
*/
public synchronized void setNumInfo(MetricsInfo pNumInfo) {
this.numInfo = pNumInfo;
}

/**
* Initialize quantileInfos array.
*
* @param length of the quantileInfos array.
*/
public synchronized void setQuantileInfos(int length) {
this.quantileInfos = new MetricsInfo[length];
}

/**
* Add entry to quantileInfos array.
*
* @param i array index.
* @param info info to be added to quantileInfos array.
*/
public synchronized void addQuantileInfo(int i, MetricsInfo info) {
this.quantileInfos[i] = info;
}

/**
* Set the rollover interval (in seconds) of the estimator.
*
* @param pIntervalSecs of the estimator.
*/
public synchronized void setInterval(int pIntervalSecs) {
this.intervalSecs = pIntervalSecs;
}

/**
* Get the rollover interval (in seconds) of the estimator.
*
* @return intervalSecs of the estimator.
*/
public synchronized int getInterval() {
return intervalSecs;
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.metrics2.lib.MutableInverseQuantiles;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -36,6 +37,7 @@ public class TestSampleQuantiles {
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };

SampleQuantiles estimator;
final static int NUM_REPEATS = 10;

@Before
public void init() {
Expand Down Expand Up @@ -91,28 +93,70 @@ public void testClear() throws IOException {
@Test
public void testQuantileError() throws IOException {
final int count = 100000;
Random r = new Random(0xDEADDEAD);
Long[] values = new Long[count];
Random rnd = new Random(0xDEADDEAD);
int[] values = new int[count];
for (int i = 0; i < count; i++) {
values[i] = (long) (i + 1);
values[i] = i + 1;
}
// Do 10 shuffle/insert/check cycles
for (int i = 0; i < 10; i++) {
System.out.println("Starting run " + i);
Collections.shuffle(Arrays.asList(values), r);

// Repeat shuffle/insert/check cycles 10 times
for (int i = 0; i < NUM_REPEATS; i++) {

// Shuffle
Collections.shuffle(Arrays.asList(values), rnd);
estimator.clear();
for (int j = 0; j < count; j++) {
estimator.insert(values[j]);

// Insert
for (int value : values) {
estimator.insert(value);
}
Map<Quantile, Long> snapshot;
snapshot = estimator.snapshot();

// Check
for (Quantile q : quantiles) {
long actual = (long) (q.quantile * count);
long error = (long) (q.error * count);
long estimate = snapshot.get(q);
System.out
.println(String.format("Expected %d with error %d, estimated %d",
actual, error, estimate));
assertThat(estimate <= actual + error).isTrue();
assertThat(estimate >= actual - error).isTrue();
}
}
}

/**
* Correctness test that checks that absolute error of the estimate for inverse quantiles
* is within specified error bounds for some randomly permuted streams of items.
*/
@Test
public void testInverseQuantiles() throws IOException {
SampleQuantiles inverseQuantilesEstimator =
new SampleQuantiles(MutableInverseQuantiles.INVERSE_QUANTILES);
final int count = 100000;
Random rnd = new Random(0xDEADDEAD);
int[] values = new int[count];
for (int i = 0; i < count; i++) {
values[i] = i + 1;
}

// Repeat shuffle/insert/check cycles 10 times
for (int i = 0; i < NUM_REPEATS; i++) {
// Shuffle
Collections.shuffle(Arrays.asList(values), rnd);
inverseQuantilesEstimator.clear();

// Insert
for (int value : values) {
inverseQuantilesEstimator.insert(value);
}
Map<Quantile, Long> snapshot;
snapshot = inverseQuantilesEstimator.snapshot();

// Check
for (Quantile q : MutableInverseQuantiles.INVERSE_QUANTILES) {
long actual = (long) (q.quantile * count);
long error = (long) (q.error * count);
long estimate = snapshot.get(q);
assertThat(estimate <= actual + error).isTrue();
assertThat(estimate >= actual - error).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,34 @@ public static void assertQuantileGauges(String prefix,
*/
public static void assertQuantileGauges(String prefix,
MetricsRecordBuilder rb, String valueName) {
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
for (Quantile q : MutableQuantiles.quantiles) {
String nameTemplate = prefix + "%dthPercentile" + valueName;
int percentile = (int) (100 * q.quantile);
verify(rb).addGauge(
eqName(info(String.format(nameTemplate, percentile), "")),
geq(0l));
geq(0L));
}
}

/**
* Asserts that the NumOps and inverse quantiles for a metric have been changed at
* some point to a non-zero value, for the specified value name of the
* metrics (e.g., "Rate").
*
* @param prefix of the metric
* @param rb MetricsRecordBuilder with the metric
* @param valueName the value name for the metric
*/
public static void assertInverseQuantileGauges(String prefix,
MetricsRecordBuilder rb, String valueName) {
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
for (Quantile q : MutableQuantiles.quantiles) {
String nameTemplate = prefix + "%dthInversePercentile" + valueName;
int percentile = (int) (100 * q.quantile);
verify(rb).addGauge(
eqName(info(String.format(nameTemplate, percentile), "")),
geq(0L));
}
}

Expand Down
Loading

0 comments on commit 817c917

Please sign in to comment.