Skip to content
Permalink
Browse files
AMBARI-22343. Add ability in AMS to tee metrics to a set of configure…
…d Kafka brokers. (swagle)
  • Loading branch information
swagle authored and avijayanhwx committed Apr 1, 2018
1 parent c8cab48 commit c95cc6ea998fc72db7070e990e677b8114533241
Showing 10 changed files with 270 additions and 108 deletions.
@@ -296,6 +296,12 @@
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -418,6 +424,11 @@
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -2,9 +2,6 @@ server:
applicationConnectors:
- type: http
port: 9999
adminConnectors:
- type: http
port: 9990
requestLog:
type: external

@@ -80,7 +80,8 @@
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<includeScope>compile</includeScope>
<excludeScope>test</excludeScope>
<excludeArtifactIds>jasper-runtime,jasper-compiler</excludeArtifactIds>
<excludeArtifactIds>jasper-runtime,jasper-compiler
</excludeArtifactIds>
</configuration>
</execution>
</executions>
@@ -125,11 +126,13 @@
<source>
<location>target/lib</location>
<excludes>
<exclude>*tests.jar</exclude>
<exclude>*tests.jar</exclude>
</excludes>
</source>
<source>
<location>${project.build.directory}/${project.artifactId}-${project.version}.jar</location>
<location>
${project.build.directory}/${project.artifactId}-${project.version}.jar
</location>
</source>
</sources>
</mapping>
@@ -214,7 +217,9 @@
<location>conf/unix/amshbase_metrics_whitelist</location>
</source>
<source>
<location>target/embedded/${hbase.folder}/conf/hbase-site.xml</location>
<location>
target/embedded/${hbase.folder}/conf/hbase-site.xml
</location>
</source>
</sources>
</mapping>
@@ -287,7 +292,8 @@
<skip>true</skip>
<attach>false</attach>
<submodules>false</submodules>
<controlDir>${project.basedir}/../src/main/package/deb/control</controlDir>
<controlDir>${project.basedir}/../src/main/package/deb/control
</controlDir>
</configuration>
</plugin>
</plugins>
@@ -657,23 +663,29 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${hbase.version}</version>
<scope>test</scope>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
</dependency>

<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
@@ -731,17 +743,17 @@
</goals>
<configuration>
<target name="Download HBase">
<mkdir dir="${project.build.directory}/embedded" />
<mkdir dir="${project.build.directory}/embedded"/>
<get
src="${hbase.tar}"
dest="${project.build.directory}/embedded/hbase.tar.gz"
usetimestamp="true"
/>
src="${hbase.tar}"
dest="${project.build.directory}/embedded/hbase.tar.gz"
usetimestamp="true"
/>
<untar
src="${project.build.directory}/embedded/hbase.tar.gz"
dest="${project.build.directory}/embedded"
compression="gzip"
/>
src="${project.build.directory}/embedded/hbase.tar.gz"
dest="${project.build.directory}/embedded"
compression="gzip"
/>
</target>
</configuration>
</execution>
@@ -755,19 +767,19 @@
<target name="Download Phoenix">
<mkdir dir="${project.build.directory}/embedded"/>
<get
src="${phoenix.tar}"
dest="${project.build.directory}/embedded/phoenix.tar.gz"
usetimestamp="true"
/>
src="${phoenix.tar}"
dest="${project.build.directory}/embedded/phoenix.tar.gz"
usetimestamp="true"
/>
<untar
src="${project.build.directory}/embedded/phoenix.tar.gz"
dest="${project.build.directory}/embedded"
compression="gzip"
/>
src="${project.build.directory}/embedded/phoenix.tar.gz"
dest="${project.build.directory}/embedded"
compression="gzip"
/>
<move
file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
/>
file="${project.build.directory}/embedded/${phoenix.folder}/phoenix-${phoenix.version}-server.jar"
tofile="${project.build.directory}/embedded/${hbase.folder}/lib/phoenix-${phoenix.version}-server.jar"
/>
</target>
</configuration>
</execution>
@@ -798,24 +810,24 @@
</goals>
<configuration>
<target name="Download HBase">
<mkdir dir="${project.build.directory}/embedded" />
<mkdir dir="${project.build.directory}/embedded"/>
<get
src="${hbase.winpkg.zip}"
dest="${project.build.directory}/embedded/hbase.zip"
usetimestamp="true"
/>
src="${hbase.winpkg.zip}"
dest="${project.build.directory}/embedded/hbase.zip"
usetimestamp="true"
/>
<unzip
src="${project.build.directory}/embedded/hbase.zip"
dest="${project.build.directory}/embedded/hbase.temp"
/>
src="${project.build.directory}/embedded/hbase.zip"
dest="${project.build.directory}/embedded/hbase.temp"
/>
<unzip
src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
dest="${project.build.directory}/embedded"
/>
src="${project.build.directory}/embedded/hbase.temp/resources/${hbase.winpkg.folder}.zip"
dest="${project.build.directory}/embedded"
/>
<copy
file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
/>
file="${project.build.directory}/embedded/hbase.temp/resources/servicehost.exe"
tofile="${project.build.directory}/embedded/${hbase.winpkg.folder}/bin/ams_hbase_master.exe"
/>
</target>
</configuration>
</execution>
@@ -854,7 +866,8 @@
<!-- The configuration of the plugin -->
<configuration>
<!-- Configuration of the archiver -->
<finalName>${project.artifactId}-simulator-${project.version}</finalName>
<finalName>${project.artifactId}-simulator-${project.version}
</finalName>
<archive>
<!-- Manifest specific configuration -->
<manifest>
@@ -120,7 +120,6 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -211,7 +210,7 @@ public class PhoenixHBaseAccessor {
private HashMap<String, String> tableTTL = new HashMap<>();

private final TimelineMetricConfiguration configuration;
private InternalMetricsSource rawMetricsSource;
private List<InternalMetricsSource> rawMetricsSources;

public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -278,15 +277,17 @@ public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
LOG.info("Initialized aggregator sink class " + metricSinkClass);
}

ExternalSinkProvider externalSinkProvider = configuration.getExternalSinkProvider();
List<ExternalSinkProvider> externalSinkProviderList = configuration.getExternalSinkProviderList();
InternalSourceProvider internalSourceProvider = configuration.getInternalSourceProvider();
if (externalSinkProvider != null) {
ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
int interval = configuration.getExternalSinkInterval(RAW_METRICS);
if (interval == -1){
interval = cacheCommitInterval;
if (!externalSinkProviderList.isEmpty()) {
for (ExternalSinkProvider externalSinkProvider : externalSinkProviderList) {
ExternalMetricsSink rawMetricsSink = externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
int interval = configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(), RAW_METRICS);
if (interval == -1) {
interval = cacheCommitInterval;
}
rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink));
}
rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink);
}
TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance);
}
@@ -303,8 +304,10 @@ public void commitMetricsFromCache() {
}
if (metricsList.size() > 0) {
commitMetrics(metricsList);
if (rawMetricsSource != null) {
rawMetricsSource.publishTimelineMetrics(metricsList);
if (!rawMetricsSources.isEmpty()) {
for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
rawMetricsSource.publishTimelineMetrics(metricsList);
}
}
}
}
@@ -316,10 +319,8 @@ public void commitMetrics(TimelineMetrics timelineMetrics) {
private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
PreparedStatement metricRecordStmt = null;
try {

Map<String, String> metricMetadata = metric.getMetadata();



byte[] uuid = metadataManagerInstance.getUuid(metric);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
@@ -26,6 +26,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -51,8 +52,6 @@
* Configuration class that reads properties from ams-site.xml. All values
* for time or intervals are given in seconds.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class TimelineMetricConfiguration {
private static final Log LOG = LogFactory.getLog(TimelineMetricConfiguration.class);

@@ -343,14 +342,22 @@ public class TimelineMetricConfiguration {
public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = "timeline.metrics.collector.ignite.nodes.backups";

public static final String INTERNAL_CACHE_HEAP_PERCENT =
"timeline.metrics.service.cache.%s.heap.percent";
"timeline.metrics.internal.cache.%s.heap.percent";

public static final String EXTERNAL_SINK_INTERVAL =
"timeline.metrics.service.external.sink.%s.interval";
"timeline.metrics.external.sink.%s.%s.interval";

public static final String DEFAULT_EXTERNAL_SINK_DIR =
"timeline.metrics.service.external.sink.dir";

"timeline.metrics.external.sink.dir";

public static final String KAFKA_SERVERS = "timeline.metrics.external.sink.kafka.bootstrap.servers";
public static final String KAFKA_ACKS = "timeline.metrics.external.sink.kafka.acks";
public static final String KAFKA_RETRIES = "timeline.metrics.external.sink.kafka.bootstrap.retries";
public static final String KAFKA_BATCH_SIZE = "timeline.metrics.external.sink.kafka.batch.size";
public static final String KAFKA_LINGER_MS = "timeline.metrics.external.sink.kafka.linger.ms";
public static final String KAFKA_BUFFER_MEM = "timeline.metrics.external.sink.kafka.buffer.memory";
public static final String KAFKA_SINK_TIMEOUT_SECONDS = "timeline.metrics.external.sink.kafka.timeout.seconds";

private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration metricsSslConf;
@@ -601,8 +608,24 @@ public boolean isWhitelistingEnabled() {
return false;
}

public int getExternalSinkInterval(SOURCE_NAME sourceName) {
return Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, sourceName), "-1"));
/**
* Get the sink interval for a metrics source.
* Determines how often the metrics will be written to the sink.
* This determines whether any caching will be needed on the collector
* side, default interval disables caching by writing at the same time as
* we get data.
*
* @param sinkProviderClassName Simple name of your implementation of {@link ExternalSinkProvider}
* @param sourceName {@link SOURCE_NAME}
* @return seconds
*/
public int getExternalSinkInterval(String sinkProviderClassName,
SOURCE_NAME sourceName) {
String sinkProviderSimpleClassName = sinkProviderClassName.substring(
sinkProviderClassName.lastIndexOf(".") + 1);

return Integer.parseInt(metricsConf.get(
String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, sourceName), "-1"));
}

public InternalSourceProvider getInternalSourceProvider() {
@@ -612,12 +635,18 @@ public InternalSourceProvider getInternalSourceProvider() {
return ReflectionUtils.newInstance(providerClass, metricsConf);
}

public ExternalSinkProvider getExternalSinkProvider() {
Class<?> providerClass = metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
if (providerClass != null) {
return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf);
/**
* List of external sink provider classes. Comma-separated.
*/
public List<ExternalSinkProvider> getExternalSinkProviderList() {
Class<?>[] providerClasses = metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
List<ExternalSinkProvider> providerList = new ArrayList<>();
if (providerClasses != null) {
for (Class<?> providerClass : providerClasses) {
providerList.add((ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, metricsConf));
}
}
return null;
return providerList;
}

public String getInternalCacheHeapPercent(String instanceName) {

0 comments on commit c95cc6e

Please sign in to comment.