Skip to content
Permalink
Browse files
[AMBARI-24171] Fix issues in AMS aggregation and writes. (#1603)
* [AMBARI-24171] Fix issues in AMS aggregation and writes.

* [AMBARI-24171] Fix issues in AMS aggregation and writes - 2
  • Loading branch information
avijayanhwx committed Jun 25, 2018
1 parent 4191b61 commit 9ef2bb9a291787e083516df6545555ce35ad6a2c
Showing 6 changed files with 164 additions and 20 deletions.
@@ -455,7 +455,6 @@ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregatio
for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
break;
}
long timestamp = aggregationResult.getTimeInMilis();
if (LOG.isDebugEnabled()) {
@@ -358,8 +358,11 @@ public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection)

try {
int rows = metricRecordStmt.executeUpdate();
} catch (SQLException sql) {
LOG.error("Failed on insert records to store.", sql);
} catch (SQLException | NumberFormatException ex) {
LOG.warn("Failed on insert records to store : " + ex.getMessage());
LOG.warn("Metric that cannot be stored : [" + metric.getMetricName() + "," + metric.getAppId() + "]" +
metric.getMetricValues().toString());
continue;
}

if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
@@ -1469,7 +1472,7 @@ public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricCluster
}

rowCount++;
byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false);
byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString());
continue;
@@ -17,6 +17,16 @@
*/
package org.apache.ambari.metrics.core.timeline;

import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
@@ -331,7 +341,7 @@ public class TimelineMetricConfiguration {
public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";

private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsConf = new Configuration();
private Configuration metricsSslConf;
private Configuration amsEnvConf;
private volatile boolean isInitialized = false;
@@ -698,4 +708,34 @@ public String getTransientMetricPatterns() {
return StringUtils.EMPTY;
}

public long getTableTtl(String tableName) {

if (StringUtils.isEmpty(tableName)) {
return Long.MAX_VALUE;
}

switch (tableName) {

case METRICS_RECORD_TABLE_NAME:
return metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400);
case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
return metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400);
case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
return metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400);
case METRICS_AGGREGATE_DAILY_TABLE_NAME:
return metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400);
case METRICS_CLUSTER_AGGREGATE_TABLE_NAME:
return metricsConf.getInt(CLUSTER_SECOND_TABLE_TTL, 7 * 86400);
case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
return metricsConf.getInt(CLUSTER_MINUTE_TABLE_TTL, 30 * 86400);
case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
return metricsConf.getInt(CLUSTER_HOUR_TABLE_TTL, 365 * 86400);
case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
return metricsConf.getInt(CLUSTER_DAILY_TABLE_TTL, 730 * 86400);
case CONTAINER_METRICS_TABLE_NAME:
return metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400);
default:
return Long.MAX_VALUE;
}
}
}
@@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -666,7 +667,12 @@ private static Precision getBestPrecisionForCondition(Condition condition) {
for (; i < precisions.length; i++) {
long rowsPerMetric = getRowCountForPrecision(precisions[i], range, CollectionUtils.isNotEmpty(hostNames));
if ((rowsPerMetric * metricNames.size() * numHosts) <= PhoenixHBaseAccessor.RESULTSET_LIMIT) {
break;

long ttl = getTtlForPrecision(precisions[i], CollectionUtils.isNotEmpty(hostNames));
long currentTime = System.currentTimeMillis();
if (currentTime - ttl * 1000 <= condition.getStartTime()) {
break;
}
}
}
if (i >= precisions.length) {
@@ -675,6 +681,41 @@ private static Precision getBestPrecisionForCondition(Condition condition) {
return precisions[i];
}

private static long getTtlForPrecision(Precision precision, boolean withHosts) {
TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();

switch (precision) {
case SECONDS:
if (withHosts) {
return configuration.getTableTtl(METRICS_RECORD_TABLE_NAME);
} else {
return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
}

case MINUTES:
if (withHosts) {
return configuration.getTableTtl(METRICS_AGGREGATE_MINUTE_TABLE_NAME);
} else {
return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
}

case HOURS:
if (withHosts) {
return configuration.getTableTtl(METRICS_AGGREGATE_HOURLY_TABLE_NAME);
} else {
return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
}

default:
if (withHosts) {
return configuration.getTableTtl(METRICS_AGGREGATE_DAILY_TABLE_NAME);
} else {
return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME);
}
}
}


public static PreparedStatement prepareGetLatestMetricSqlStmt(
Connection connection, Condition condition) throws SQLException {
return prepareGetLatestMetricSqlStmtHelper(connection, condition, GET_LATEST_METRIC_SQL, METRICS_RECORD_TABLE_NAME);
@@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -123,11 +124,11 @@ public void testGetMetricRecordsMinutes() throws IOException, SQLException {
long ctime = startTime;
long minute = 60 * 1000;

TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local3",
"disk_free", 1);
hdb.insertMetricRecords(metrics1, true);

TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local1",
TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local3",
"disk_free", 2);
hdb.insertMetricRecords(metrics2, true);

@@ -139,11 +140,11 @@ public void testGetMetricRecordsMinutes() throws IOException, SQLException {

// WHEN
List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_%"); }},
Collections.singletonList("local1"),
Collections.singletonList("local3"),
"host", null);
Condition condition = new DefaultCondition(uuids,
new ArrayList<String>() {{ add("disk_free"); }},
Collections.singletonList("local1"),
Collections.singletonList("local3"),
"host", null, startTime, endTime + 1000, Precision.MINUTES, null, false);
TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
singletonValueFunctionMap("disk_free"));
@@ -153,7 +154,7 @@ public void testGetMetricRecordsMinutes() throws IOException, SQLException {
TimelineMetric metric = timelineMetrics.getMetrics().get(0);

assertEquals("disk_free", metric.getMetricName());
assertEquals("local1", metric.getHostName());
assertEquals("local3", metric.getHostName());
assertEquals(1, metric.getMetricValues().size());
Iterator<Map.Entry<Long, Double>> iterator = metric.getMetricValues().entrySet().iterator();
assertEquals(1.5, iterator.next().getValue(), 0.00001);
@@ -360,6 +361,60 @@ public void testGetClusterMetricRecordsHours() throws Exception {
assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001);
}

@Test
public void testGetInvalidMetricRecords() throws IOException, SQLException {
// GIVEN
long startTime = System.currentTimeMillis();
long ctime = startTime;
long minute = 60 * 1000;

List<TimelineMetric> timelineMetricList = new ArrayList<>();

timelineMetricList.add(
MetricTestHelper.createTimelineMetric(ctime, "valid_metric", "h1", "test_app",
null, 1.0));
timelineMetricList.add(
MetricTestHelper.createTimelineMetric(ctime, "invalid_metric", "h1", "test_app",
null, Double.NaN));
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(timelineMetricList);
hdb.insertMetricRecords(timelineMetrics, true);

// WHEN
long endTime = ctime + minute;
List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("valid_metric"); }},
Collections.singletonList("h1"),
"test_app", null);

Condition condition = new DefaultCondition(uuids,
new ArrayList<String>() {{ add("valid_metric"); }},
Collections.singletonList("h1"),
"test_app", null, startTime - 1000, endTime, Precision.SECONDS, null, true);
TimelineMetrics timelineMetricsFromStore = hdb.getMetricRecords(condition,
singletonValueFunctionMap("valid_metric"));

//THEN
assertEquals(1, timelineMetricsFromStore.getMetrics().size());
TimelineMetric metric = timelineMetricsFromStore.getMetrics().get(0);

assertEquals("valid_metric", metric.getMetricName());
assertEquals("h1", metric.getHostName());
assertEquals(4, metric.getMetricValues().size());


uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("invalid_metric"); }},
Collections.singletonList("h1"),
"test_app", null);
condition = new DefaultCondition(uuids,
new ArrayList<String>() {{ add("invalid_metric"); }},
Collections.singletonList("h1"),
"test_app", null, startTime, endTime, Precision.SECONDS, null, true);
timelineMetricsFromStore = hdb.getMetricRecords(condition, singletonValueFunctionMap("invalid_metric"));
assertTrue(timelineMetricsFromStore.getMetrics().isEmpty());

}


@Test
public void testInitPoliciesAndTTL() throws Exception {
Admin hBaseAdmin = hdb.getHBaseAdmin();
@@ -167,7 +167,7 @@ public void testPrepareGetAggregateNoPrecision() throws SQLException {
long hour = 60 * minute;
long day = 24 * hour;

Long endTime = 1407959918000L;
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 200 * second;

//SECONDS precision
@@ -289,7 +289,7 @@ public void testPrepareGetMetricsNoPrecision() throws SQLException {
long minute = 60 * second;
long hour = 60 * minute;

Long endTime = 1407959918000L;
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 200 * second;
// SECONDS precision
// 2 Metrics, 1 Host, Time = 200 seconds
@@ -349,7 +349,7 @@ public void testPrepareGetMetricsNoPrecision() throws SQLException {
verify(connection, preparedStatement);

// HOURS precision
startTime = endTime - 30 * 24 * hour;
startTime = endTime - 29 * 24 * hour;
condition = new DefaultCondition(
new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"),
"a1", "i1", startTime, endTime, null, null, false);
@@ -472,9 +472,16 @@ public void testResultSetLimitCheck() throws SQLException {
hosts.add("TestHost"+i);
}

long second = 1000;
long minute = 60 * second;
long hour = 60 * minute;

Long endTime = System.currentTimeMillis();
Long startTime = endTime - hour;

Condition condition = new DefaultCondition(
metrics, hosts,
"a1", "i1", 1407950000L, 1407953600L, Precision.SECONDS, null, false);
"a1", "i1", startTime, endTime, Precision.SECONDS, null, false);
Connection connection = createNiceMock(Connection.class);
PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class);
Capture<String> stmtCapture = new Capture<String>();
@@ -490,7 +497,7 @@ public void testResultSetLimitCheck() throws SQLException {
//Check without passing precision. Should be OK!
condition = new DefaultCondition(
metrics, hosts,
"a1", "i1", 1407950000L, 1407953600L, null, null, false);
"a1", "i1", startTime, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -516,7 +523,7 @@ public void testResultSetLimitCheck() throws SQLException {
}
condition = new DefaultCondition(
metrics, hosts,
"a1", "i1", 1407867200L, 1407953600L, null, null, false);
"a1", "i1", endTime - 24*hour, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -538,7 +545,7 @@ public void testResultSetLimitCheck() throws SQLException {
}
condition = new DefaultCondition(
metrics, hosts,
"a1", "i1", 1407867200L, 1407953600L, null, null, false);
"a1", "i1", endTime - 24*hour, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -562,10 +569,9 @@ public void testResultSetLimitCheck() throws SQLException {
for (int i = 0; i < numHosts; i++) {
hosts.add("TestHost"+i);
}
long endtime = 1407953600L;
condition = new DefaultCondition(
metrics, hosts,
"a1", "i1", endtime - 5 * 60 * 60, endtime, Precision.SECONDS, null, false);
"a1", "i1", endTime - 5 * hour, endTime, Precision.SECONDS, null, false);
boolean exceptionThrown = false;
boolean requestedSizeFoundInMessage = false;

0 comments on commit 9ef2bb9

Please sign in to comment.