Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
AMBARI-23804 : Refine AMS HBase region splitting calculation based on…
… UUID work.
  • Loading branch information
Aravindan Vijayan committed May 10, 2018
1 parent 056f699 commit a878b0488139b20609c43d6f575ff8546da6ae14
Showing 21 changed files with 815 additions and 316 deletions.
@@ -17,7 +17,6 @@
*/
package org.apache.ambari.metrics.core.timeline;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;

@@ -35,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -111,15 +109,18 @@ private TimelineMetricDistributedCache startCacheNode() throws MalformedURLExcep
private synchronized void initializeSubsystem() {
if (!isInitialized) {
hBaseAccessor = new PhoenixHBaseAccessor(null);
// Initialize schema
hBaseAccessor.initMetricSchema();
// Initialize metadata from store

// Initialize metadata
try {
metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
} catch (MalformedURLException | URISyntaxException e) {
throw new ExceptionInInitializerError("Unable to initialize metadata manager");
}
metricMetadataManager.initializeMetadata();

// Initialize metric schema
hBaseAccessor.initMetricSchema();

// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
// Start HA service
@@ -395,6 +396,10 @@ static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(L
return metricsFunctions;
}

public void putMetricsSkipCache(TimelineMetrics metrics) throws SQLException, IOException {
hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, true);
}

@Override
public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException {
// Error indicated by the Sql exception

Large diffs are not rendered by default.

@@ -237,12 +237,6 @@ public class TimelineMetricConfiguration {
public static final String WATCHER_MAX_FAILURES =
"timeline.metrics.service.watcher.max.failures";

public static final String PRECISION_TABLE_SPLIT_POINTS =
"timeline.metrics.host.aggregate.splitpoints";

public static final String AGGREGATE_TABLE_SPLIT_POINTS =
"timeline.metrics.cluster.aggregate.splitpoints";

public static final String AGGREGATORS_SKIP_BLOCK_CACHE =
"timeline.metrics.aggregators.skip.blockcache.enabled";

@@ -261,12 +255,6 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_SINK_COLLECTION_PERIOD =
"timeline.metrics.sink.collection.period";

public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
"timeline.metrics.precision.table.durability";

public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
"timeline.metrics.aggregate.tables.durability";

public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
"timeline.metrics.whitelisting.enabled";

@@ -285,33 +273,9 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_APPS_WHITELIST =
"timeline.metrics.apps.whitelist";

public static final String HBASE_BLOCKING_STORE_FILES =
"hbase.hstore.blockingStoreFiles";

public static final String DEFAULT_TOPN_HOSTS_LIMIT =
"timeline.metrics.default.topn.hosts.limit";

public static final String TIMELINE_METRIC_AGGREGATION_SQL_FILTERS =
"timeline.metrics.cluster.aggregation.sql.filters";

public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY =
"timeline.metrics.hbase.aggregate.table.compaction.policy.key";

public static final String TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS =
"timeline.metrics.hbase.aggregate.table.compaction.policy.class";

public static final String TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.aggregate.table.hbase.hstore.blockingStoreFiles";

public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY =
"timeline.metrics.hbase.precision.table.compaction.policy.key";

public static final String TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS =
"timeline.metrics.hbase.precision.table.compaction.policy.class";

public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";

public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
"timeline.metrics.support.multiple.clusters";

@@ -346,14 +310,23 @@ public class TimelineMetricConfiguration {

public static final String TRANSIENT_METRIC_PATTERNS = "timeline.metrics.transient.metric.patterns";

public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS = "timeline.metrics.initial.configured.master.components";
public static final String TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS = "timeline.metrics.initial.configured.slave.components";

public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";


public static final String HSTORE_COMPACTION_CLASS_KEY = "hbase.hstore.defaultengine.compactionpolicy.class";
public static final String HSTORE_ENGINE_CLASS = "hbase.hstore.engine.class";
public static final String FIFO_COMPACTION_POLICY_CLASS = "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
public static final String DATE_TIERED_COMPACTION_POLICY = "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";

private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsSslConf;
@@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.metrics.core.timeline;

import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS;
import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS;

public class TimelineMetricSplitPointComputer {

private static final Log LOG = LogFactory.getLog(TimelineMetricSplitPointComputer.class);
private Set<String> masterComponents = new HashSet<>();
private Set<String> slaveComponents = new HashSet<>();

private static final int MINIMUM_PRECISION_TABLE_REGIONS = 4;
private static final int MINIMUM_AGGREGATE_TABLE_REGIONS = 2;
private static final int OTHER_TABLE_STATIC_REGIONS = 8;
private static final int SLAVE_EQUIDISTANT_POINTS = 50;
private static final int MASTER_EQUIDISTANT_POINTS = 5;

private List<byte[]> precisionSplitPoints = new ArrayList<>();
private List<byte[]> aggregateSplitPoints = new ArrayList<>();

public TimelineMetricSplitPointComputer(Configuration metricsConf,
Configuration hbaseConf,
TimelineMetricMetadataManager timelineMetricMetadataManager) {

String componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_MASTER_COMPONENTS, "");
if (StringUtils.isNotEmpty(componentsString)) {
masterComponents.addAll(Arrays.asList(componentsString.split(",")));
}

componentsString = metricsConf.get(TIMELINE_METRIC_INITIAL_CONFIGURED_SLAVE_COMPONENTS, "");
if (StringUtils.isNotEmpty(componentsString)) {
slaveComponents.addAll(Arrays.asList(componentsString.split(",")));
}

double hbaseTotalHeapsize = metricsConf.getDouble("hbase_total_heapsize", 1024*1024*1024);
double hbaseMemstoreUpperLimit = hbaseConf.getDouble("hbase.regionserver.global.memstore.upperLimit", 0.5);
double hbaseMemstoreFlushSize = hbaseConf.getDouble("hbase.hregion.memstore.flush.size", 134217728);

computeSplitPoints(hbaseTotalHeapsize, hbaseMemstoreUpperLimit, hbaseMemstoreFlushSize, timelineMetricMetadataManager);
}


private void computeSplitPoints(double hbaseTotalHeapsize,
double hbaseMemstoreUpperLimit,
double hbaseMemstoreFlushSize,
TimelineMetricMetadataManager timelineMetricMetadataManager) {

double memstoreMaxMemory = hbaseMemstoreUpperLimit * hbaseTotalHeapsize;
int maxInMemoryRegions = (int) ((memstoreMaxMemory / hbaseMemstoreFlushSize) - OTHER_TABLE_STATIC_REGIONS);

int targetPrecisionTableRegionCount = MINIMUM_PRECISION_TABLE_REGIONS;
int targetAggregateTableRegionCount = MINIMUM_AGGREGATE_TABLE_REGIONS;

if (maxInMemoryRegions > 2) {
targetPrecisionTableRegionCount = Math.max(4, (int)(0.70 * maxInMemoryRegions));
targetAggregateTableRegionCount = Math.max(2, (int)(0.15 * maxInMemoryRegions));
}

List<MetricApp> metricList = new ArrayList<>();

for (String component : masterComponents) {
metricList.addAll(getSortedMetricListForSplitPoint(component, false));
}

for (String component : slaveComponents) {
metricList.addAll(getSortedMetricListForSplitPoint(component, true));
}

int totalMetricLength = metricList.size();

if (targetPrecisionTableRegionCount > 1) {
int idx = (int) Math.ceil(totalMetricLength / targetPrecisionTableRegionCount);
int index = idx;
for (int i = 0; i < targetPrecisionTableRegionCount; i++) {
if (index < totalMetricLength - 1) {
MetricApp metricAppService = metricList.get(index);
byte[] uuid = timelineMetricMetadataManager.getUuid(
new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
true);
precisionSplitPoints.add(uuid);
index += idx;
}
}
}

if (targetAggregateTableRegionCount > 1) {
int idx = (int) Math.ceil(totalMetricLength / targetAggregateTableRegionCount);
int index = idx;
for (int i = 0; i < targetAggregateTableRegionCount; i++) {
if (index < totalMetricLength - 1) {
MetricApp metricAppService = metricList.get(index);
byte[] uuid = timelineMetricMetadataManager.getUuid(
new TimelineClusterMetric(metricAppService.metricName, metricAppService.appId, null, -1),
true);
aggregateSplitPoints.add(uuid);
index += idx;
}
}
}
}

private List<MetricApp> getSortedMetricListForSplitPoint(String component, boolean isSlave) {

String appId = getAppId(component);
List<MetricApp> metricList = new ArrayList<>();

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}

String strLine;
BufferedReader bufferedReader;

try (InputStream inputStream = classLoader.getResourceAsStream("metrics_def/" + appId.toUpperCase() + ".dat")) {

if (inputStream != null) {
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
LOG.info("Found split point candidate metrics for : " + appId);

while ((strLine = bufferedReader.readLine()) != null) {
metricList.add(new MetricApp(strLine.trim(), appId));
}
} else {
LOG.info("Split point candidate metrics not found for : " + appId);
}
} catch (Exception e) {
LOG.info("Error reading split point candidate metrics for component : " + component);
LOG.error(e);
}

if (isSlave) {
return getEquidistantMetrics(metricList, SLAVE_EQUIDISTANT_POINTS);
} else {
return getEquidistantMetrics(metricList, MASTER_EQUIDISTANT_POINTS);
}
}

private List<MetricApp> getEquidistantMetrics(List<MetricApp> metrics, int distance) {
List<MetricApp> selectedMetricApps = new ArrayList<>();

int idx = metrics.size() / distance;
if (idx == 0) {
return metrics;
}

int index = idx;
for (int i = 0; i < distance; i++) {
selectedMetricApps.add(metrics.get(index - 1));
index += idx;
}
return selectedMetricApps;
}


public List<byte[]> getPrecisionSplitPoints() {
return precisionSplitPoints;
}

public List<byte[]> getClusterAggregateSplitPoints() {
return aggregateSplitPoints;
}

public List<byte[]> getHostAggregateSplitPoints() {
return aggregateSplitPoints;
}

private String getAppId(String component) {

if (component.equalsIgnoreCase("METRICS_COLLECTOR")) {
return "ams-hbase";
}

if (component.equalsIgnoreCase("METRICS_MONITOR")) {
return "HOST";
}
return component;
}
}

class MetricApp implements Comparable{
String metricName;
String appId;

MetricApp(String metricName, String appId) {
this.metricName = metricName;
if (appId.startsWith("hbase")) {
this.appId = "hbase";
} else {
this.appId = appId;
}
}

@Override
public int compareTo(Object o) {
MetricApp that = (MetricApp)o;

int metricCompare = metricName.compareTo(that.metricName);
if (metricCompare != 0) {
return metricCompare;
}

return appId.compareTo(that.appId);
}
}
@@ -45,13 +45,13 @@ public class TimelineMetricStoreWatcher implements Runnable {
private static int failures = 0;
private final TimelineMetricConfiguration configuration;

private TimelineMetricStore timelineMetricStore;
private HBaseTimelineMetricsService timelineMetricStore;

//used to call timelineMetricStore blocking methods with timeout
private ExecutorService executor = Executors.newSingleThreadExecutor();


public TimelineMetricStoreWatcher(TimelineMetricStore timelineMetricStore,
public TimelineMetricStoreWatcher(HBaseTimelineMetricsService timelineMetricStore,
TimelineMetricConfiguration configuration) {
this.timelineMetricStore = timelineMetricStore;
this.configuration = configuration;
@@ -100,7 +100,7 @@ private boolean checkMetricStore() {

Callable<TimelineMetric> task = new Callable<TimelineMetric>() {
public TimelineMetric call() throws Exception {
timelineMetricStore.putMetrics(metrics);
timelineMetricStore.putMetricsSkipCache(metrics);
TimelineMetrics timelineMetrics = timelineMetricStore.getTimelineMetrics(
Collections.singletonList(FAKE_METRIC_NAME), Collections.singletonList(FAKE_HOSTNAME),
FAKE_APP_ID, null, startTime - delay * 2 * 1000,

0 comments on commit a878b04

Please sign in to comment.