From 1336d891fbb8a04c312af5b91b19e004bb3c3f12 Mon Sep 17 00:00:00 2001 From: Aravindan Vijayan Date: Tue, 14 Nov 2017 09:46:00 -0800 Subject: [PATCH] AMBARI-22437 : Create an 'AD Manager' component in Ambari Metrics Service stack side. (avijayan) --- .../conf/unix/ambari-metrics-admanager.sh | 194 ++++++++++++++++++ .../conf/unix/log4j.properties | 31 +++ .../pom.xml | 14 +- .../src/main/resources/config.yml | 6 +- .../app/AnomalyDetectionAppConfig.scala | 7 +- .../app/AnomalyDetectionAppModule.scala | 9 +- .../configuration/HBaseConfiguration.scala | 2 + .../MetricCollectorConfiguration.scala | 16 +- .../MetricDefinitionDBConfiguration.scala | 6 +- .../adservice/db/LevelDbStoreAccessor.scala | 56 +++++ .../adservice/leveldb/LevelDBDatasource.scala | 17 +- .../metadata/ADMetadataProvider.scala | 17 +- .../MetricDefinitionServiceImpl.scala | 32 +-- .../adservice/resource/AnomalyResource.scala | 2 +- .../resource/MetricDefinitionResource.scala | 24 ++- .../trend/TrendAnomalyInstance.scala | 17 ++ .../app/AnomalyDetectionAppConfigTest.scala | 14 +- .../app/DefaultADResourceSpecTest.scala | 4 +- .../leveldb/LevelDBDataSourceTest.scala | 4 +- 19 files changed, 422 insertions(+), 50 deletions(-) create mode 100644 ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh create mode 100644 ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties create mode 100644 ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala diff --git a/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh b/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh new file mode 100644 index 00000000..f1a1ae3f --- /dev/null +++ b/ambari-metrics-anomaly-detection-service/conf/unix/ambari-metrics-admanager.sh @@ -0,0 +1,194 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific + +PIDFILE=/var/run//var/run/ambari-metrics-anomaly-detection/ambari-metrics-admanager.pid +OUTFILE=/var/log/ambari-metrics-anomaly-detection/ambari-metrics-admanager.out + +CONF_DIR=/etc/ambari-metrics-anomaly-detection/conf +DAEMON_NAME=ams_admanager + +STOP_TIMEOUT=5 + +function write_pidfile +{ + local pidfile="$1" + echo $! > "${pidfile}" 2>/dev/null + if [[ $? -gt 0 ]]; then + echo "ERROR: Cannot write pid ${pidfile}." | tee -a $STARTUPFILE + exit 1; + fi +} + +function java_setup +{ + # Bail if we did not detect it + if [[ -z "${JAVA_HOME}" ]]; then + echo "ERROR: JAVA_HOME is not set and could not be found." + exit 1 + fi + + if [[ ! -d "${JAVA_HOME}" ]]; then + echo "ERROR: JAVA_HOME ${JAVA_HOME} does not exist." + exit 1 + fi + + JAVA="${JAVA_HOME}/bin/java" + + if [[ ! -x "$JAVA" ]]; then + echo "ERROR: $JAVA is not executable." + exit 1 + fi +} + +function daemon_status() +{ + # + # LSB 4.1.0 compatible status command (1) + # + # 0 = program is running + # 1 = dead, but still a pid (2) + # 2 = (not used by us) + # 3 = not running + # + # 1 - this is not an endorsement of the LSB + # + # 2 - technically, the specification says /var/run/pid, so + # we should never return this value, but we're giving + # them the benefit of a doubt and returning 1 even if + # our pid is not in in /var/run . + # + + local pidfile="$1" + shift + + local pid + + if [[ -f "${pidfile}" ]]; then + pid=$(cat "${pidfile}") + if ps -p "${pid}" > /dev/null 2>&1; then + return 0 + fi + return 1 + fi + return 3 +} + +function start() +{ + java_setup + + daemon_status "${PIDFILE}" + if [[ $? == 0 ]]; then + echo "AMS AD Manager is running as process $(cat "${PIDFILE}"). Exiting" | tee -a $STARTUPFILE + exit 0 + else + # stale pid file, so just remove it and continue on + rm -f "${PIDFILE}" >/dev/null 2>&1 + fi + + nohup "${JAVA}" "-Xms$AMS_AD_HEAPSIZE" "-Xmx$AMS_AD_HEAPSIZE" ${AMS_AD_OPTS} "-Dlog4j.configuration=file://$CONF_DIR/log4j.properties" "-jar" "/usr/lib/ambari-metrics-anomaly-detection/ambari-metrics-anomaly-detection-service.jar" "server" "${CONF_DIR}/config.yaml" "$@" > $OUTFILE 2>&1 & + PID=$! + write_pidfile "${PIDFILE}" + sleep 2 + + echo "Verifying ${DAEMON_NAME} process status..." + if [ -z "`ps ax -o pid | grep ${PID}`" ]; then + if [ -s ${OUTFILE} ]; then + echo "ERROR: ${DAEMON_NAME} start failed. For more details, see ${OUTFILE}:" + echo "====================" + tail -n 10 ${OUTFILE} + echo "====================" + else + echo "ERROR: ${DAEMON_NAME} start failed" + rm -f ${PIDFILE} + fi + echo "Anomaly Detection Manager out at: ${OUTFILE}" + exit -1 + fi + + rm -f $STARTUPFILE #Deleting startup file + echo "Anomaly Detection Manager successfully started." + } + +function stop() +{ + pidfile=${PIDFILE} + + if [[ -f "${pidfile}" ]]; then + pid=$(cat "$pidfile") + + kill "${pid}" >/dev/null 2>&1 + sleep "${STOP_TIMEOUT}" + + if kill -0 "${pid}" > /dev/null 2>&1; then + echo "WARNING: ${DAEMON_NAME} did not stop gracefully after ${STOP_TIMEOUT} seconds: Trying to kill with kill -9" + kill -9 "${pid}" >/dev/null 2>&1 + fi + + if ps -p "${pid}" > /dev/null 2>&1; then + echo "ERROR: Unable to kill ${pid}" + else + rm -f "${pidfile}" >/dev/null 2>&1 + fi + fi +} + +# execute ams-env.sh +if [[ -f "${CONF_DIR}/ams-admanager-env.sh" ]]; then + . "${CONF_DIR}/ams-admanager-env.sh" +else + echo "ERROR: Cannot execute ${CONF_DIR}/ams-admanager-env.sh." 2>&1 + exit 1 +fi + +# set these env variables only if they were not set by ams-env.sh +: ${AMS_AD_LOG_DIR:=/var/log/ambari-metrics-anomaly-detection} + +# set pid dir path +if [[ -n "${AMS_AD_PID_DIR}" ]]; then + PIDFILE=${AMS_AD_PID_DIR}/admanager.pid +fi + +# set out file path +if [[ -n "${AMS_AD_LOG_DIR}" ]]; then + OUTFILE=${AMS_AD_LOG_DIR}/ambari-metrics-admanager.out +fi + +#TODO manage 3 hbase daemons for start/stop/status +case "$1" in + + start) + start + + ;; + stop) + stop + + ;; + status) + daemon_status "${PIDFILE}" + if [[ $? == 0 ]]; then + echo "AMS AD Manager is running as process $(cat "${PIDFILE}")." + else + echo "AMS AD Manager is not running." + fi + ;; + restart) + stop + start + ;; + +esac diff --git a/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties b/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties new file mode 100644 index 00000000..9dba1daf --- /dev/null +++ b/ambari-metrics-anomaly-detection-service/conf/unix/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Define some default values that can be overridden by system properties +# Root logger option +log4j.rootLogger=INFO,file + +# Direct log messages to a log file +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=/var/log/ambari-metrics-anomaly-detection/ambari-metrics-admanager.log +log4j.appender.file.MaxFileSize=80MB +log4j.appender.file.MaxBackupIndex=60 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n + + diff --git a/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics-anomaly-detection-service/pom.xml index cfa8124b..142f02fa 100644 --- a/ambari-metrics-anomaly-detection-service/pom.xml +++ b/ambari-metrics-anomaly-detection-service/pom.xml @@ -135,7 +135,7 @@ 3.1.0 false - true + *:* @@ -231,6 +231,12 @@ org.apache.kafka connect-json 0.10.1.0 + + + jackson-databind + com.fasterxml.jackson.core + + org.apache.spark @@ -262,6 +268,10 @@ jersey-json com.sun.jersey + + com.fasterxml.jackson.core + jackson-databind + @@ -307,7 +317,6 @@ org.apache.hadoop hadoop-common ${hadoop.version} - provided commons-el @@ -446,7 +455,6 @@ com.google.guava guava 21.0 - test io.dropwizard.metrics diff --git a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml index 299a4729..9402f6e0 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml +++ b/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml @@ -21,10 +21,12 @@ logging: type: external metricDefinitionService: - inputDefinitionDirectory: /etc/ambari-metrics-anomaly-detection/conf + inputDefinitionDirectory: /etc/ambari-metrics-anomaly-detection/conf/definitionDirectory metricsCollector: - hostPortList: host1:6188,host2:6188 + hosts: host1,host2 + port: 6188 + protocol: http metadataEndpoint: /v1/timeline/metrics/metadata/keys adQueryService: diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala index aa20223b..93f6b284 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala @@ -20,14 +20,16 @@ package org.apache.ambari.metrics.adservice.app import javax.validation.Valid -import org.apache.ambari.metrics.adservice.configuration._ +import org.apache.ambari.metrics.adservice.configuration.{HBaseConfiguration, _} + +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonIgnoreProperties, JsonProperty} -import com.fasterxml.jackson.annotation.JsonProperty import io.dropwizard.Configuration /** * Top Level AD System Manager config items. */ +@JsonIgnoreProperties(ignoreUnknown=true) class AnomalyDetectionAppConfig extends Configuration { /* @@ -54,6 +56,7 @@ class AnomalyDetectionAppConfig extends Configuration { /* HBase Conf */ + @JsonIgnore def getHBaseConf : org.apache.hadoop.conf.Configuration = { HBaseConfiguration.getHBaseConf } diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala index 28b28800..a896563c 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala @@ -17,14 +17,16 @@ */ package org.apache.ambari.metrics.adservice.app -import org.apache.ambari.metrics.adservice.db.MetadataDatasource +import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource} import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource -import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource} +import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl} +import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource} import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl} import com.codahale.metrics.health.HealthCheck import com.google.inject.AbstractModule import com.google.inject.multibindings.Multibinder + import io.dropwizard.setup.Environment class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule { @@ -34,8 +36,11 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm val healthCheckBinder = Multibinder.newSetBinder(binder(), classOf[HealthCheck]) healthCheckBinder.addBinding().to(classOf[DefaultHealthCheck]) bind(classOf[AnomalyResource]) + bind(classOf[MetricDefinitionResource]) bind(classOf[RootResource]) + bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor]) bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl]) + bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl]) bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource]) } } diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala index a7bbc662..a51a959e 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala @@ -19,12 +19,14 @@ package org.apache.ambari.metrics.adservice.configuration import java.net.{MalformedURLException, URISyntaxException} import org.apache.hadoop.conf.Configuration +import org.slf4j.{Logger, LoggerFactory} object HBaseConfiguration { val HBASE_SITE_CONFIGURATION_FILE: String = "hbase-site.xml" val hbaseConf: org.apache.hadoop.conf.Configuration = new Configuration(true) var isInitialized: Boolean = false + val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration") def initConfigs(): Unit = { if (!isInitialized) { diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala index 94188970..25307300 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricCollectorConfiguration.scala @@ -28,13 +28,25 @@ import com.fasterxml.jackson.annotation.JsonProperty class MetricCollectorConfiguration { @NotNull - private var hostPortList: String = _ + private var hosts: String = _ + + @NotNull + private var port: String = _ + + @NotNull + private var protocol: String = _ @NotNull private var metadataEndpoint: String = _ @JsonProperty - def getHostPortList: String = hostPortList + def getHosts: String = hosts + + @JsonProperty + def getPort: String = port + + @JsonProperty + def getProtocol: String = protocol @JsonProperty def getMetadataEndpoint: String = metadataEndpoint diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala index 79a350c9..ef4e00c8 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala @@ -26,12 +26,14 @@ class MetricDefinitionDBConfiguration { @NotNull private var dbDirPath: String = _ + private var verifyChecksums: Boolean = true + private var performParanoidChecks: Boolean = false @JsonProperty("verifyChecksums") - def verifyChecksums: Boolean = true + def getVerifyChecksums: Boolean = verifyChecksums @JsonProperty("performParanoidChecks") - def performParanoidChecks: Boolean = false + def getPerformParanoidChecks: Boolean = performParanoidChecks @JsonProperty("dbDirPath") def getDbDirPath: String = dbDirPath diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala new file mode 100644 index 00000000..baad57df --- /dev/null +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala @@ -0,0 +1,56 @@ +package org.apache.ambari.metrics.adservice.db + +import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition + +import com.google.inject.Inject + +class LevelDbStoreAccessor extends AdMetadataStoreAccessor{ + + @Inject + var levelDbDataSource : MetadataDatasource = _ + + @Inject + def this(levelDbDataSource: MetadataDatasource) = { + this + this.levelDbDataSource = levelDbDataSource + } + + /** + * Return all saved component definitions from DB. + * + * @return + */ + override def getSavedInputDefinitions: List[MetricSourceDefinition] = { + List.empty[MetricSourceDefinition] + } + + /** + * Save a set of component definitions + * + * @param metricSourceDefinitions Set of component definitions + * @return Success / Failure + */ +override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = { + true +} + + /** + * Save a component definition + * + * @param metricSourceDefinition component definition + * @return Success / Failure + */ + override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = { + true + } + + /** + * Delete a component definition + * + * @param definitionName component definition + * @return + */ + override def removeInputDefinition(definitionName: String): Boolean = { + true + } +} diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala index 6d185bfe..a34a60ac 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala @@ -20,6 +20,8 @@ package org.apache.ambari.metrics.adservice.leveldb import java.io.File +import javax.inject.Inject + import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration import org.apache.ambari.metrics.adservice.db.MetadataDatasource @@ -29,11 +31,20 @@ import org.iq80.leveldb.impl.Iq80DBFactory import com.google.inject.Singleton @Singleton -class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource { +class LevelDBDataSource() extends MetadataDatasource { private var db: DB = _ @volatile var isInitialized: Boolean = false + var appConfig: AnomalyDetectionAppConfig = _ + + @Inject + def this(appConfig: AnomalyDetectionAppConfig) = { + this + this.appConfig = appConfig + initialize() + } + override def initialize(): Unit = { if (isInitialized) return @@ -41,8 +52,8 @@ class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDa db = createDB(new LevelDbConfig { override val createIfMissing: Boolean = true - override val verifyChecksums: Boolean = configuration.verifyChecksums - override val paranoidChecks: Boolean = configuration.performParanoidChecks + override val verifyChecksums: Boolean = configuration.getVerifyChecksums + override val paranoidChecks: Boolean = configuration.getPerformParanoidChecks override val path: String = configuration.getDbDirPath }) isInitialized = true diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala index 3bcf4b07..95b1b639 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala @@ -32,7 +32,9 @@ import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper */ class ADMetadataProvider extends MetricMetadataProvider { - var metricCollectorHostPorts: Array[String] = Array.empty[String] + var metricCollectorHosts: Array[String] = Array.empty[String] + var metricCollectorPort: String = _ + var metricCollectorProtocol: String = _ var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys" val connectTimeout: Int = 10000 @@ -42,9 +44,11 @@ class ADMetadataProvider extends MetricMetadataProvider { def this(configuration: MetricCollectorConfiguration) { this - if (StringUtils.isNotEmpty(configuration.getHostPortList)) { - metricCollectorHostPorts = configuration.getHostPortList.split(",") + if (StringUtils.isNotEmpty(configuration.getHosts)) { + metricCollectorHosts = configuration.getHosts.split(",") } + metricCollectorPort = configuration.getPort + metricCollectorProtocol = configuration.getProtocol metricMetadataPath = configuration.getMetadataEndpoint } @@ -57,8 +61,8 @@ class ADMetadataProvider extends MetricMetadataProvider { for (metricDef <- metricSourceDefinition.metricDefinitions) { if (metricDef.isValid) { //Skip requesting metric keys for invalid definitions. - for (hostPort <- metricCollectorHostPorts) { - val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef) + for (host <- metricCollectorHosts) { + val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef) if (metricKeys != null) { keysMap += (metricDef -> metricKeys) metricKeySet.++(metricKeys) @@ -76,8 +80,9 @@ class ADMetadataProvider extends MetricMetadataProvider { * @param metricDefinition * @return */ - def getKeysFromMetricsCollector(url: String, metricDefinition: MetricDefinition): Set[MetricKey] = { + def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = { + val url: String = protocol + "://" + host + port + "/" + path val mapper = new ObjectMapper() with ScalaObjectMapper try { val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection] diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala index ffa9944a..c34d2dd5 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala @@ -19,15 +19,16 @@ package org.apache.ambari.metrics.adservice.metadata import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor +import org.slf4j.{Logger, LoggerFactory} import com.google.inject.{Inject, Singleton} @Singleton class MetricDefinitionServiceImpl extends MetricDefinitionService { - @Inject - var adMetadataStoreAccessor: AdMetadataStoreAccessor = _ + val LOG : Logger = LoggerFactory.getLogger(classOf[MetricDefinitionServiceImpl]) + var adMetadataStoreAccessor: AdMetadataStoreAccessor = _ var configuration: AnomalyDetectionAppConfig = _ var metricMetadataProvider: MetricMetadataProvider = _ @@ -36,18 +37,10 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map() @Inject - def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig) = { - this () - //TODO : Create AD Metadata instance here (or inject) - configuration = anomalyDetectionAppConfig - initializeService() - } - - def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, adMetadataStoreAccessor: AdMetadataStoreAccessor) = { + def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = { this () - //TODO : Create AD Metadata instance here (or inject). Pass in Schema information. + adMetadataStoreAccessor = metadataStoreAccessor configuration = anomalyDetectionAppConfig - this.adMetadataStoreAccessor = adMetadataStoreAccessor initializeService() } @@ -67,13 +60,13 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { //Load definitions from metadata store val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions for (definition <- definitionsFromStore) { - validateAndSanitizeMetricSourceDefinition(definition) + sanitizeMetricSourceDefinition(definition) } //Load definitions from configs val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig for (definition <- definitionsFromConfig) { - validateAndSanitizeMetricSourceDefinition(definition) + sanitizeMetricSourceDefinition(definition) } //Union the 2 sources, with DB taking precedence. @@ -100,6 +93,9 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { @Override def getDefinitionByName(name: String): MetricSourceDefinition = { + if (!metricSourceDefinitionMap.contains(name)) { + LOG.warn("Metric Source Definition with name " + name + " not found") + } metricSourceDefinitionMap.apply(name) } @@ -187,7 +183,13 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService { this.adMetadataStoreAccessor = adMetadataStoreAccessor } - def validateAndSanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = { + + /** + * Look into the Metric Definitions inside a Metric Source definition, and push down source level appId & + * hosts to Metric definition if they do not have an override. + * @param metricSourceDefinition Input Metric Source Definition + */ + def sanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = { val sourceLevelAppId: String = metricSourceDefinition.appId val sourceLevelHostList: List[String] = metricSourceDefinition.hosts diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala index c941ac3d..98ce0c4b 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala @@ -23,7 +23,7 @@ import javax.ws.rs.{GET, Path, Produces} import org.joda.time.DateTime -@Path("/topNAnomalies") +@Path("/anomaly") class AnomalyResource { @GET diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala index aacea792..16125fae 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala @@ -17,12 +17,24 @@ package org.apache.ambari.metrics.adservice.resource +import javax.ws.rs.{GET, Path, Produces} +import javax.ws.rs.core.MediaType.APPLICATION_JSON + +import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricSourceDefinition} +import org.apache.commons.lang.StringUtils + +import com.google.inject.Inject + +@Path("/metric-definition") class MetricDefinitionResource { - /* - GET component definition - POST component definition - DELETE component definition - PUT component definition - */ + @Inject + var metricDefinitionService: MetricDefinitionService = _ + + @GET + @Produces(Array(APPLICATION_JSON)) + def getMetricDefinition (definitionName: String) : MetricSourceDefinition = { + null + } + } diff --git a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala index 125da345..3fc0d6fa 100644 --- a/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala +++ b/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ambari.metrics.adservice.subsystem.trend import org.apache.ambari.metrics.adservice.common.{Season, TimeRange} diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala index 104cceae..989ba219 100644 --- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala +++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfigTest.scala @@ -44,11 +44,21 @@ class AnomalyDetectionAppConfigTest extends FunSuite { assert(config.isInstanceOf[AnomalyDetectionAppConfig]) - assert(config.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory == "/etc/ambari-metrics-anomaly-detection/conf") + assert(config.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory == + "/etc/ambari-metrics-anomaly-detection/conf/definitionDirectory") - assert(config.getMetricCollectorConfiguration.getHostPortList == "host1:6188,host2:6188") + assert(config.getMetricCollectorConfiguration.getHosts == "host1,host2") + + assert(config.getMetricCollectorConfiguration.getPort == "6188") assert(config.getAdServiceConfiguration.getAnomalyDataTtl == 604800) + + assert(config.getMetricDefinitionDBConfiguration.getDbDirPath == "/var/lib/ambari-metrics-anomaly-detection/") + + assert(config.getMetricDefinitionDBConfiguration.getVerifyChecksums) + + assert(!config.getMetricDefinitionDBConfiguration.getPerformParanoidChecks) + } } diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala index 65cf609e..2a4999c1 100644 --- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala +++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala @@ -32,10 +32,10 @@ import com.google.common.io.Resources class DefaultADResourceSpecTest extends FunSpec with Matchers { - describe("/topNAnomalies") { + describe("/anomaly") { it("Must return default message") { withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yml").getPath) { rule => - val json = client.target(s"http://localhost:${rule.getLocalPort}/topNAnomalies") + val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly") .request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String]) val now = DateTime.now.toString("MM-dd-yyyy hh:mm") assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}") diff --git a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala index 2ddb7b86..9757d762 100644 --- a/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala +++ b/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala @@ -36,8 +36,8 @@ class LevelDBDataSourceTest extends FunSuite with BeforeAndAfter with Matchers w val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration] when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig) - when(mdConfig.verifyChecksums).thenReturn(true) - when(mdConfig.performParanoidChecks).thenReturn(false) + when(mdConfig.getVerifyChecksums).thenReturn(true) + when(mdConfig.getPerformParanoidChecks).thenReturn(false) when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath) db = new LevelDBDataSource(appConfig)