Skip to content
Permalink
Browse files
AMBARI-22740 : Rename ambari metrics collector package to org.apache.…
…ambari.metrics. (Commit 2)
  • Loading branch information
Aravindan Vijayan committed Apr 18, 2018
1 parent 4e1713b commit 2b339a4a5963f60636250b5319b4a89a2ec27b76
Showing 10 changed files with 69 additions and 35 deletions.
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.metrics.core.timeline;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;

import java.io.IOException;
@@ -92,7 +94,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time

/**
* Construct the service.
*
*/
public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) {
super(HBaseTimelineMetricsService.class.getName());
@@ -159,8 +160,8 @@ private synchronized void initializeSubsystem() {
}
}

defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT, "20"));
if (Boolean.parseBoolean(metricsConf.get(TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}

@@ -587,7 +588,7 @@ public Thread newThread(Runnable r) {
aggregator.getSleepIntervalMillis(),
TimeUnit.MILLISECONDS);
LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " +
+aggregator.getSleepIntervalMillis() + " milliseconds.");
+ aggregator.getSleepIntervalMillis() + " milliseconds.");
} else {
LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled.");
}
@@ -18,6 +18,14 @@
package org.apache.ambari.metrics.core.timeline;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getJavaRegexFromSqlRegex;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
@@ -84,7 +92,7 @@ public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxExcep
//TODO add config to disable logging

//enable ssl for ignite requests
if (metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
SslContextFactory sslContextFactory = new SslContextFactory();
String keyStorePath = sslConf.get("ssl.server.keystore.location");
String keyStorePassword = sslConf.get("ssl.server.keystore.password");
@@ -100,11 +108,11 @@ public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxExcep

//aggregation parameters
appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
interpolationEnabled = Boolean.parseBoolean(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
Long aggregationInterval = metricConf.getLong(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);

String filteredMetricPatterns = metricConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
if (!StringUtils.isEmpty(filteredMetricPatterns)) {
LOG.info("Skipping aggregation for metric patterns : " + filteredMetricPatterns);
for (String patternString : filteredMetricPatterns.split(",")) {
@@ -113,10 +121,10 @@ public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxExcep
}
}

if (metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
discoverySpi.setIpFinder(ipFinder);
igniteConfiguration.setDiscoverySpi(discoverySpi);
@@ -143,7 +151,7 @@ public TimelineMetricsIgniteCache() throws MalformedURLException, URISyntaxExcep
cacheConfiguration.setName("metrics_cache");
//set cache mode to partitioned with # of backups
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
cacheConfiguration.setBackups(metricConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
//disable throttling due to cpu impact
cacheConfiguration.setRebalanceThrottle(0);
//enable locks
@@ -214,7 +222,7 @@ public void putMetrics(Collection<TimelineMetric> elements, TimelineMetricMetada
putMetricIntoCache(metricDoubleEntry.getKey(), newMetricClusterAggregate);
if (hostMetadata != null) {
//calculate app host metric
if (metric.getAppId().equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
if (metric.getAppId().equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
if (hostMetadata.containsKey(metric.getHostName())) {
updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), newMetricClusterAggregate, hostMetadata.get(metric.getHostName()));
@@ -36,6 +36,9 @@
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID;

/**
* Aggregator responsible for providing app level host aggregates. This task
* is accomplished without doing a round trip to storage, rather
@@ -91,7 +94,7 @@ public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
}

// If metric is a host metric and host has apps on it
if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
if (appId.equalsIgnoreCase(HOST_APP_ID)) {
// Candidate metric, update app aggregates
if (hostMetadata.containsKey(hostname)) {
updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue);
@@ -128,15 +131,15 @@ private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetr
return;
}

TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps();
for (String appId : apps.keySet()) {
if (appIdsToAggregate.contains(appId)) {

appKey.setAppId(appId);
TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey);
if (appMetadata == null) {
TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId());
TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId());
TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key);

if (hostMetricMetadata != null) {
@@ -178,7 +181,7 @@ public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMet
}

private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS);
String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
if (!StringUtils.isEmpty(appIds)) {
return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
}
@@ -33,6 +33,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;

import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;

public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class);
private TimelineMetricMetadataManager metricMetadataManager;
@@ -84,7 +86,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
endTime, null, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
@@ -36,6 +36,8 @@
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;

import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;

public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class);
TimelineMetricReadHelper readHelper;
@@ -73,7 +75,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
endTime, null, null, true);
condition.setNoLimit();
condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName));
// Retaining order of the row-key avoids client side merge sort.
condition.addOrderByColumn("UUID");
condition.addOrderByColumn("SERVER_TIME");
@@ -31,6 +31,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;

import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;

public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {

public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName,
@@ -62,7 +64,7 @@ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
EmptyCondition condition = new EmptyCondition();
condition.setDoUpdate(true);

condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
outputTableName, endTime, tableName,
getDownsampledMetricSkipClause(), startTime, endTime));

@@ -50,6 +50,12 @@
import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
import org.apache.ambari.metrics.core.timeline.uuid.HashBasedUuidGenStrategy;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS;

public class TimelineMetricMetadataManager {
private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class);
private boolean isDisabled = false;
@@ -82,7 +88,7 @@ public class TimelineMetricMetadataManager {
public TimelineMetricMetadataManager(Configuration metricsConf, PhoenixHBaseAccessor hBaseAccessor) {
this.metricsConf = metricsConf;
this.hBaseAccessor = hBaseAccessor;
String patternStrings = metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS);
String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS);
if (!StringUtils.isEmpty(patternStrings)) {
metricNameFilters.addAll(Arrays.asList(patternStrings.split(",")));
}
@@ -98,14 +104,14 @@ public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) throws
* Initialize Metadata from the store
*/
public void initializeMetadata() {
if (metricsConf.getBoolean(TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT, false)) {
if (metricsConf.getBoolean(DISABLE_METRIC_METADATA_MGMT, false)) {
isDisabled = true;
} else {
metricMetadataSync = new TimelineMetricMetadataSync(this);
// Schedule the executor to sync to store
executorService.scheduleWithFixedDelay(metricMetadataSync,
metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
metricsConf.getInt(TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes
metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes
TimeUnit.SECONDS);
// Read from store and initialize map
try {
@@ -330,7 +336,7 @@ private void loadUuidMapsOnInit() {
* @return
*/
private MetricUuidGenStrategy getUuidStrategy(Configuration configuration) {
String strategy = configuration.get(TimelineMetricConfiguration.TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
String strategy = configuration.get(TIMELINE_METRICS_UUID_GEN_STRATEGY, "");
if ("random".equalsIgnoreCase(strategy)) {
return new RandomUuidGenStrategy();
} else {
@@ -345,7 +345,6 @@ public class PhoenixTransactSQL {
"MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE METRIC_NAME LIKE %s AND SERVER_TIME > %s AND " +
"SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";


public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";

public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
@@ -31,6 +31,8 @@
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;

public class DefaultFSSinkProvider implements ExternalSinkProvider {
private static final Log LOG = LogFactory.getLog(DefaultFSSinkProvider.class);
TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
@@ -64,7 +66,7 @@ public int getSinkTimeOutSeconds() {
@Override
public int getFlushSeconds() {
try {
return conf.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
return conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
} catch (Exception e) {
LOG.warn("Cannot read cache commit interval.");
}

0 comments on commit 2b339a4

Please sign in to comment.