Skip to content
Permalink
Browse files
AMBARI-22359 : Fix Serialization issues in Metric Definition Service …
…(avijayan).
  • Loading branch information
Aravindan Vijayan committed Apr 1, 2018
1 parent 778409e commit 112d2b1be2107b795f3bc5e2b46edb4881c17da4
Showing 15 changed files with 126 additions and 101 deletions.
@@ -20,8 +20,8 @@ server:
logging:
type: external

metricManagerService:
inputDefinitionDirectory: /etc/adservice/conf/input-definitions-directory
metricDefinitionService:
inputDefinitionDirectory: /etc/ambari-metrics-anomaly-detection/conf

metricsCollector:
hostPortList: host1:6188,host2:6188
@@ -20,7 +20,7 @@ package org.apache.ambari.metrics.adservice.app

import javax.validation.Valid

import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricManagerServiceConfiguration}
import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricDefinitionServiceConfiguration}

import com.fasterxml.jackson.annotation.JsonProperty

@@ -35,7 +35,7 @@ class AnomalyDetectionAppConfig extends Configuration {
Metric Definition Service configuration
*/
@Valid
private val metricManagerServiceConfiguration = new MetricManagerServiceConfiguration
private val metricDefinitionServiceConfiguration = new MetricDefinitionServiceConfiguration

@Valid
private val metricCollectorConfiguration = new MetricCollectorConfiguration
@@ -53,9 +53,9 @@ class AnomalyDetectionAppConfig extends Configuration {
HBaseConfiguration.getHBaseConf
}

@JsonProperty("metricManagerService")
def getMetricManagerServiceConfiguration: MetricManagerServiceConfiguration = {
metricManagerServiceConfiguration
@JsonProperty("metricDefinitionService")
def getMetricDefinitionServiceConfiguration: MetricDefinitionServiceConfiguration = {
metricDefinitionServiceConfiguration
}

@JsonProperty("adQueryService")
@@ -112,11 +112,11 @@ object Season {
validSeasons.toList
}

def serialize(season: Season) : String = {
def toJson(season: Season) : String = {
mapper.writeValueAsString(season)
}

def deserialize(seasonString: String) : Season = {
def fromJson(seasonString: String) : Season = {
mapper.readValue[Season](seasonString)
}
}
@@ -39,14 +39,4 @@ class MetricCollectorConfiguration {
@JsonProperty
def getMetadataEndpoint: String = metadataEndpoint

@JsonProperty
def setHostPortList(hostPortList: String): Unit = {
this.hostPortList = hostPortList
}

@JsonProperty
def setMetadataEndpoint(metadataEndpoint: String): Unit = {
this.metadataEndpoint = metadataEndpoint
}

}
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
/**
* Class to capture the Metric Definition Service configuration.
*/
class MetricManagerServiceConfiguration {
class MetricDefinitionServiceConfiguration {

@NotNull
private val inputDefinitionDirectory: String = ""
@@ -94,11 +94,11 @@ object PhoenixAnomalyStoreAccessor {
val timestamp: Long = rs.getLong("ANOMALY_TIMESTAMP")
val metricValue: Double = rs.getDouble("METRIC_VALUE")
val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME"))
val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO"))
val season: Season = Season.fromJson(rs.getString("SEASONAL_INFO"))
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")

val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid)
val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
metricValue, methodType, anomalyScore, season, modelSnapshot)
anomalies.+=(anomalyInstance)
@@ -111,11 +111,11 @@ object PhoenixAnomalyStoreAccessor {
val referenceStart: Long = rs.getLong("TEST_PERIOD_START")
val referenceEnd: Long = rs.getLong("TEST_PERIOD_END")
val methodType: AnomalyDetectionMethod = AnomalyDetectionMethod.withName(rs.getString("METHOD_NAME"))
val season: Season = Season.deserialize(rs.getString("SEASONAL_INFO"))
val season: Season = Season.fromJson(rs.getString("SEASONAL_INFO"))
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")

val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid)
val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey,
TimeRange(anomalyStart, anomalyEnd),
TimeRange(referenceStart, referenceEnd),
@@ -56,11 +56,13 @@ class ADMetadataProvider extends MetricMetadataProvider {
val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]

for (metricDef <- metricSourceDefinition.metricDefinitions) {
for (hostPort <- metricCollectorHostPorts) {
val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef)
if (metricKeys != null) {
keysMap += (metricDef -> metricKeys)
metricKeySet.++(metricKeys)
if (metricDef.isValid) { //Skip requesting metric keys for invalid definitions.
for (hostPort <- metricCollectorHostPorts) {
val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(hostPort + metricMetadataPath, metricDef)
if (metricKeys != null) {
keysMap += (metricDef -> metricKeys)
metricKeySet.++(metricKeys)
}
}
}
}
@@ -18,22 +18,46 @@

package org.apache.ambari.metrics.adservice.metadata

import org.apache.commons.lang3.StringUtils
/*
{
"metric-name": "mem_free",
"appId" : "HOST",
"hosts" : ["h1","h2"],
"metric-description" : "Free memory on a Host.",
"troubleshooting-info" : "Sudden drop / hike in free memory on a host.",
"static-threshold" : 10,
“app-id” : “HOST”
}
*/

case class MetricDefinition (metricName: String,
appId: String,
hosts: List[String],
metricDescription: String,
troubleshootingInfo: String,
staticThreshold: Double) {
@SerialVersionUID(1002L)
class MetricDefinition extends Serializable {

var metricName: String = _
var appId: String = _
var hosts: List[String] = List.empty[String]
var metricDescription: String = ""
var troubleshootingInfo: String = ""
var staticThreshold: Double = _

//A Metric definition is valid if we can resolve a metricName and appId (defined or inherited) at runtime)
private var valid : Boolean = true

def this(metricName: String,
appId: String,
hosts: List[String],
metricDescription: String,
troubleshootingInfo: String,
staticThreshold: Double) = {
this
this.metricName = metricName
this.appId = appId
this.hosts = hosts
this.metricDescription = metricDescription
this.troubleshootingInfo = troubleshootingInfo
this.staticThreshold = staticThreshold
}

@Override
override def equals(obj: scala.Any): Boolean = {
@@ -46,10 +70,20 @@ case class MetricDefinition (metricName: String,
if (!(metricName == that.metricName))
return false

if (!(appId == that.appId))
return false
if (StringUtils.isNotEmpty(appId)) {
appId == that.appId
}
else {
StringUtils.isEmpty(that.appId)
}
}

def isValid: Boolean = {
valid
}

true
def makeInvalid() : Unit = {
valid = false
}
}

@@ -17,7 +17,7 @@

package org.apache.ambari.metrics.adservice.metadata

trait MetricManagerService {
trait MetricDefinitionService {

/**
* Given a 'UUID', return the metric key associated with it.
@@ -23,7 +23,7 @@ import org.apache.ambari.metrics.adservice.db.AdMetadataStoreAccessor
import com.google.inject.{Inject, Singleton}

@Singleton
class MetricManagerServiceImpl extends MetricManagerService {
class MetricDefinitionServiceImpl extends MetricDefinitionService {

@Inject
var adMetadataStoreAccessor: AdMetadataStoreAccessor = _
@@ -66,15 +66,21 @@ class MetricManagerServiceImpl extends MetricManagerService {

//Load definitions from metadata store
val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
for (definition <- definitionsFromStore) {
validateAndSanitizeMetricSourceDefinition(definition)
}

//Load definitions from configs
val definitionsFromConfig: List[MetricSourceDefinition] = getInputDefinitionsFromConfig
for (definition <- definitionsFromConfig) {
validateAndSanitizeMetricSourceDefinition(definition)
}

//Union the 2 sources, with DB taking precedence.
//Save new definition list to DB.
metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))

//Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back Map<MD,Set<MK>>
//Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back (Map<MD,Set<MK>>, Set<MK>)
for (definition <- metricSourceDefinitionMap.values) {
val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition)
metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap)
@@ -173,11 +179,33 @@ class MetricManagerServiceImpl extends MetricManagerService {
}

def getInputDefinitionsFromConfig: List[MetricSourceDefinition] = {
val configDirectory = configuration.getMetricManagerServiceConfiguration.getInputDefinitionDirectory
val configDirectory = configuration.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory
InputMetricDefinitionParser.parseInputDefinitionsFromDirectory(configDirectory)
}

def setAdMetadataStoreAccessor (adMetadataStoreAccessor: AdMetadataStoreAccessor) : Unit = {
this.adMetadataStoreAccessor = adMetadataStoreAccessor
}

def validateAndSanitizeMetricSourceDefinition(metricSourceDefinition: MetricSourceDefinition): Unit = {
val sourceLevelAppId: String = metricSourceDefinition.appId
val sourceLevelHostList: List[String] = metricSourceDefinition.hosts

for (metricDef <- metricSourceDefinition.metricDefinitions.toList) {
if (metricDef.appId == null) {
if (sourceLevelAppId == null || sourceLevelAppId.isEmpty) {
metricDef.makeInvalid()
} else {
metricDef.appId = sourceLevelAppId
}
}

if (metricDef.isValid && metricDef.hosts.isEmpty) {
if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
metricDef.hosts = sourceLevelHostList
}
}
}
}

}
@@ -22,10 +22,6 @@ import javax.xml.bind.annotation.XmlRootElement
import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinitionType.MetricSourceDefinitionType
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

/*
{
"definition-name": "host-memory",
@@ -45,27 +41,10 @@ import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
}
*/

/*
On Startup
Read input definitions directory, parse the JSONs
Create / Update the metric definitions in DB
Convert metric definitions to Map<MetricKey, MetricDefinition>
What to do want to have in memory?
Map of Metric Key -> List<Component Definitions>
What do we use metric definitions for?
Anomaly GET - Associate definition information as well.
Definition CRUD - Get definition given definition name
Get set of metrics that are being tracked
Return definition information for a metric key
Given a metric definition name, return set of metrics.
*/

@SerialVersionUID(10001L)
@XmlRootElement
class MetricSourceDefinition {
class MetricSourceDefinition extends Serializable{

var definitionName: String = _
var appId: String = _
@@ -103,17 +82,4 @@ class MetricSourceDefinition {
val that = obj.asInstanceOf[MetricSourceDefinition]
definitionName.equals(that.definitionName)
}
}

object MetricSourceDefinition {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)

def serialize(definition: MetricSourceDefinition) : String = {
mapper.writeValueAsString(definition)
}

def deserialize(definitionString: String) : MetricSourceDefinition = {
mapper.readValue[MetricSourceDefinition](definitionString)
}
}
@@ -44,7 +44,7 @@ class AnomalyDetectionAppConfigTest extends FunSuite {

assert(config.isInstanceOf[AnomalyDetectionAppConfig])

assert(config.getMetricManagerServiceConfiguration.getInputDefinitionDirectory == "/etc/adservice/conf/input-definitions-directory")
assert(config.getMetricDefinitionServiceConfiguration.getInputDefinitionDirectory == "/etc/ambari-metrics-anomaly-detection/conf")

assert(config.getMetricCollectorConfiguration.getHostPortList == "host1:6188,host2:6188")

@@ -78,9 +78,9 @@ class SeasonTest extends FunSuite {
test("testSerialize") {
val season1 : Season = Season(Range(Calendar.MONDAY,Calendar.FRIDAY), Range(9,17))

val seasonString = Season.serialize(season1)
val seasonString = Season.toJson(season1)

val season2 : Season = Season.deserialize(seasonString)
val season2 : Season = Season.fromJson(seasonString)
assert(season1 == season2)

val season3 : Season = Season(Range(Calendar.MONDAY,Calendar.THURSDAY), Range(9,17))

0 comments on commit 112d2b1

Please sign in to comment.