Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
AMBARI-23932 - Ambari Metrics reports incorrect values in aggregated …
…host metric data when requesting avg.
  • Loading branch information
Aravindan Vijayan committed May 24, 2018
1 parent 0176643 commit b3553944ca1bca5760fd0aa71690080dbebc73a3
Showing 3 changed files with 28 additions and 7 deletions.
@@ -38,6 +38,7 @@
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_COMPACTION_CLASS_KEY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HSTORE_ENGINE_CLASS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TRANSIENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
@@ -81,6 +82,7 @@
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_TRANSIENT_METRICS_SQL;
import static org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;

import java.io.IOException;
import java.sql.Connection;
@@ -195,6 +197,7 @@ public class PhoenixHBaseAccessor {
private final int cacheCommitInterval;
private final boolean skipBlockCacheForAggregatorsEnabled;
private TimelineMetricMetadataManager metadataManagerInstance;
private Set<String> eventMetricPatterns = new HashSet<>();

private Map<String, Integer> tableTTL = new HashMap<>();

@@ -240,6 +243,9 @@ public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
this.skipBlockCacheForAggregatorsEnabled = metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);

String eventMetricPatternStrings = metricsConf.get(TIMELINE_METRICS_EVENT_METRIC_PATTERNS, StringUtils.EMPTY);
eventMetricPatterns.addAll(getJavaMetricPatterns(eventMetricPatternStrings));

tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400)); // 1 day
tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400)); // 30 days
tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400)); //7 days
@@ -1037,10 +1043,10 @@ private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condit
}
for (Function f : functions) {
if (f.getReadFunction() == Function.ReadFunction.VALUE) {
getTimelineMetricsFromResultSet(metrics, f, condition, rs);
getTimelineMetricsFromResultSet(metrics, f, condition, rs, isEventDownsampledMetric(metricName));
} else {
SingleValuedTimelineMetric metric =
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f, isEventDownsampledMetric(metricName));

if (condition.isGrouped()) {
metrics.addOrMergeTimelineMetric(metric);
@@ -1052,12 +1058,21 @@ private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condit
} else {
// No aggregation requested
// Execution never goes here, function always contain at least 1 element
getTimelineMetricsFromResultSet(metrics, null, condition, rs);
getTimelineMetricsFromResultSet(metrics, null, condition, rs, isEventDownsampledMetric(metricName));
}
}
}

private boolean isEventDownsampledMetric(String metricName) {
for (String pattern : eventMetricPatterns) {
if (metricName.matches(pattern)) {
return true;
}
}
return false;
}

private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, Function f, Condition condition, ResultSet rs)
private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, Function f, Condition condition, ResultSet rs, boolean shouldSumAcrossTime)
throws SQLException, IOException {
if (condition.getPrecision().equals(Precision.SECONDS)) {
TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
@@ -1072,7 +1087,7 @@ private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, Function f

} else {
SingleValuedTimelineMetric metric =
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f, shouldSumAcrossTime);
if (condition.isGrouped()) {
metrics.addOrMergeTimelineMetric(metric);
} else {
@@ -59,7 +59,8 @@ public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
}

public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs,
Function f) throws SQLException, IOException {
Function f,
boolean shouldSumMetricAcrossTime) throws SQLException, IOException {

byte[] uuid = rs.getBytes("UUID");
TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid);
@@ -73,6 +74,8 @@ public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(Resul
);

double value;
// GET request for sum & avg is handled as the same since 'summing' of values across time does not make sense.
// If explicit sum downsampling is required across time, we have to use ams-site : timeline.metrics.downsampler.event.metric.patterns.
switch(function.getReadFunction()){
case AVG:
value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
@@ -85,6 +88,9 @@ public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(Resul
break;
case SUM:
value = rs.getDouble("METRIC_SUM");
if (!shouldSumMetricAcrossTime) {
value = value / rs.getInt("METRIC_COUNT");
}
break;
default:
value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
@@ -379,7 +379,7 @@ public class PhoenixTransactSQL {
public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT " +
"INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
"SELECT UUID, %s AS SERVER_TIME, " +
"ROUND(SUM(METRIC_SUM)/SUM(METRIC_COUNT),2), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";

/**

0 comments on commit b355394

Please sign in to comment.