Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
AMBARI-24367 : Fix integration test regressions in AMS collector due …
…to scale changes (#1919)

* AMBARI-24367 : Fix integration test regressions in AMS collector due to scale changes.

* AMBARI-24367 : Fix integration test regressions in AMS collector due to scale changes (2).
  • Loading branch information
avijayanhwx committed Aug 9, 2018
1 parent b64f13e commit b81d9e84287ee0a8687281a6a6d97180c3d13e9a
Showing 9 changed files with 307 additions and 110 deletions.
@@ -136,7 +136,7 @@ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResu
}

if (existingMetric != null) {
hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
hostAggregate.setSum(hostAggregate.getSum() / (perMetricCount - 1));
hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
}

@@ -21,6 +21,7 @@
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_EVENT_METRIC_PATTERNS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
@@ -99,7 +100,11 @@ public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
skipInterpolationMetricPatterns.addAll(getJavaMetricPatterns(skipInterpolationMetricPatternStrings));
}

this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
if (Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS, "false"))) {
this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true);
} else {
this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager);
}
}

@Override
@@ -153,6 +158,9 @@ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet
Map<String, MutableInt> hostedAppCounter = new HashMap<>();
if (rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
while (metric == null && rs.next()) {
metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
}

// Call slice after all rows for a host are read
while (rs.next()) {
@@ -163,7 +171,7 @@ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet
if (nextMetric == null) {
continue;
}

if (metric.equalsExceptTime(nextMetric)) {
metric.addMetricValues(nextMetric.getMetricValues());
} else {
@@ -185,13 +185,13 @@ public void initializeMetadata(boolean scheduleMetadateSync) {
}
}

metricMetadataSync = new TimelineMetricMetadataSync(this);
metricMetadataSync = new TimelineMetricMetadataSync(this);
// Schedule the executor to sync to store
if (scheduleMetadateSync) {
executorService.scheduleWithFixedDelay(metricMetadataSync,
metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
TimeUnit.SECONDS);
metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
TimeUnit.SECONDS);
}
// Read from store and initialize map
try {
@@ -592,7 +592,11 @@ public TimelineMetric getMetricFromUuid(byte[] uuid) {
timelineMetric.setInstanceId(key.instanceId);

byte[] hostUuid = ArrayUtils.subarray(uuid, TIMELINE_METRIC_UUID_LENGTH, HOSTNAME_UUID_LENGTH + TIMELINE_METRIC_UUID_LENGTH);
timelineMetric.setHostName(uuidHostMap.get(new TimelineMetricUuid(hostUuid)));
String hostname = uuidHostMap.get(new TimelineMetricUuid(hostUuid));
if (hostname == null) {
return null;
}
timelineMetric.setHostName(hostname);
return timelineMetric;
}
}
@@ -736,7 +740,7 @@ private Set<String> getSanitizedHostnames(List<String> hostnamedWithOrWithoutWil
* @throws IOException
*/
public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadataByAppId(String appId, String metricPattern,
boolean includeBlacklistedMetrics) throws SQLException, IOException {
boolean includeBlacklistedMetrics) throws SQLException, IOException {

Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = getMetadataCache();

@@ -839,8 +843,8 @@ public void updateMetadataCacheUsingV1Tables() throws SQLException {
cacheValue.setType(oldValue.getType());
cacheValue.setIsWhitelisted(oldValue.isWhitelisted());
} else if (oldValue.getSeriesStartTime() < cacheValue.getSeriesStartTime() &&
cacheValue.getSeriesStartTime() != 0L &&
cacheValue.isWhitelisted())
cacheValue.getSeriesStartTime() != 0L &&
cacheValue.isWhitelisted())
{
LOG.info(String.format("Updating startTime for %s", key));
cacheValue.setSeriesStartTime(oldValue.getSeriesStartTime());
@@ -20,6 +20,7 @@
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.phoenix.end2end.ParallelStatsDisabledIT.tearDownMiniCluster;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -38,6 +39,8 @@
import java.util.Map;
import java.util.Properties;

import javax.annotation.Nonnull;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -70,13 +73,51 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
protected Connection conn;
protected PhoenixHBaseAccessor hdb;
protected TimelineMetricMetadataManager metadataManager;
private static StandaloneHBaseTestingUtility utility;

public final Log LOG;

public AbstractMiniHBaseClusterTest() {
LOG = LogFactory.getLog(this.getClass());
}


protected static void setUpTestDriver(ReadOnlyProps props) throws Exception {
setUpTestDriver(props, props);
}

protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
if (driver == null) {
String url = checkClusterInitialized(serverProps);
driver = initAndRegisterTestDriver(url, clientProps);
}
}

private static String checkClusterInitialized(ReadOnlyProps serverProps) throws Exception {
if(!clusterInitialized) {
url = setUpTestCluster(config, serverProps);
clusterInitialized = true;
}

return url;
}

protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws Exception {
return initEmbeddedMiniCluster(conf, overrideProps);
}

private static String initEmbeddedMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
setUpConfigForMiniCluster(conf, overrideProps);
utility = new StandaloneHBaseTestingUtility(conf);

try {
utility.startStandaloneHBaseCluster();
return getLocalClusterUrl(utility);
} catch (Throwable var3) {
throw new RuntimeException(var3);
}
}

@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = getDefaultProps();
@@ -310,4 +351,17 @@ protected void insertMetricRecords(Connection conn, TimelineMetrics metrics)
}
}
}

@After
public void cleanup() throws SQLException {
for (String table : PHOENIX_TABLES) {
executeUpdate("DELETE FROM " + table);
}
}

private void executeUpdate(String query) throws SQLException {
Connection conn = getConnection(getUrl());
Statement stmt = conn.createStatement();
stmt.executeUpdate(query);
}
}
@@ -52,7 +52,6 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -72,8 +71,6 @@

import junit.framework.Assert;



public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {

@Test
@@ -226,14 +223,14 @@ public void testGetClusterMetricRecordsSeconds() throws Exception {
long startTime = System.currentTimeMillis();
long ctime = startTime + 1;
long minute = 60 * 1000;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c1",
"disk_free", 1), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c2",
"disk_free", 2), true);
ctime += minute;
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c1",
"disk_free", 2), true);
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2",
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local_c2",
"disk_free", 1), true);

long endTime = ctime + minute + 1;
@@ -443,7 +440,7 @@ public void testInitPoliciesAndTTL() throws Exception {
.filter(t -> tableName.equals(t.getNameAsString())).findFirst();

TableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get());

normalizerEnabled = tableDescriptor.isNormalizationEnabled();
if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
precisionTableCompactionPolicy = tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY);
@@ -38,20 +38,20 @@ public static MetricHostAggregate createMetricHostAggregate(double max, double m
}

public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
String host,
String metricName,
double val) {
String host,
String metricName,
double val) {
return prepareSingleTimelineMetric(startTime, host, null, metricName, val);
}

public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
String host,
String instanceId,
String metricName,
double val) {
String host,
String instanceId,
String metricName,
double val) {
TimelineMetrics m = new TimelineMetrics();
m.setMetrics(Arrays.asList(
createTimelineMetric(startTime, metricName, host, null, instanceId, val)));
createTimelineMetric(startTime, metricName, host, null, instanceId, val)));

return m;
}
@@ -71,11 +71,11 @@ public static TimelineMetrics prepareSingleTimelineMetric(long startTime,


public static TimelineMetric createTimelineMetric(long startTime,
String metricName,
String host,
String appId,
String instanceId,
double val) {
String metricName,
String host,
String appId,
String instanceId,
double val) {
TimelineMetric m = new TimelineMetric();
m.setHostName(host);
m.setAppId(appId != null ? appId : "host");
@@ -104,16 +104,27 @@ public static TimelineMetric createEmptyTimelineMetric(long startTime) {
return metric;
}

public static TimelineMetric createEmptyTimelineMetric(String metricName, long startTime) {
TimelineMetric metric = new TimelineMetric();
metric.setMetricName(metricName);
metric.setAppId("test_app");
metric.setInstanceId("test_instance");
metric.setHostName("test_host");
metric.setStartTime(startTime);

return metric;
}

public static TimelineClusterMetric createEmptyTimelineClusterMetric(
String name, long startTime) {
String name, long startTime) {
TimelineClusterMetric metric = new TimelineClusterMetric(name,
"test_app", "instance_id", startTime);
"test_app", "instance_id", startTime);

return metric;
}

public static TimelineClusterMetric createEmptyTimelineClusterMetric(
long startTime) {
long startTime) {
return createEmptyTimelineClusterMetric("disk_used", startTime);
}
}
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.core.timeline;

import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;

public class StandaloneHBaseTestingUtility extends HBaseTestingUtility {

public StandaloneHBaseTestingUtility(Configuration configuration) {
super(configuration);
}

public MiniHBaseCluster startStandaloneHBaseCluster() throws Exception {
if (this.getZkCluster() == null) {
this.startMiniZKCluster();
}
return this.startMiniHBaseCluster(1, 1, (List) null, null, null, true, true);
}

}

0 comments on commit b81d9e8

Please sign in to comment.