Skip to content
Permalink
Browse files
AMBARI-22365. Add storage support for storing metric definitions usin…
…g LevelDB. (swagle)
  • Loading branch information
swagle authored and avijayanhwx committed Apr 1, 2018
1 parent 112d2b1 commit b8661693b1e9be70c221443ab3da14fbf8896add
Showing 8 changed files with 306 additions and 6 deletions.
@@ -424,11 +424,18 @@
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>org.iq80.leveldb</groupId>
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -452,5 +459,11 @@
<version>2.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -30,6 +30,14 @@ metricsCollector:
adQueryService:
anomalyDataTtl: 604800

metricDefinitionDB:
# force checksum verification of all data that is read from the file system on behalf of a particular read
verifyChecksums: true
# raise an error as soon as it detects an internal corruption
performParanoidChecks: false
# Path to Level DB directory
dbDirPath: /var/lib/ambari-metrics-anomaly-detection/

#subsystemService:
# spark:
# pointInTime:
@@ -20,10 +20,9 @@ package org.apache.ambari.metrics.adservice.app

import javax.validation.Valid

import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricDefinitionServiceConfiguration}
import org.apache.ambari.metrics.adservice.configuration._

import com.fasterxml.jackson.annotation.JsonProperty

import io.dropwizard.Configuration

/**
@@ -46,6 +45,12 @@ class AnomalyDetectionAppConfig extends Configuration {
@Valid
private val adServiceConfiguration = new AdServiceConfiguration

/**
* LevelDB settings for metrics definitions
*/
@Valid
private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration

/*
HBase Conf
*/
@@ -66,4 +71,6 @@ class AnomalyDetectionAppConfig extends Configuration {
@JsonProperty("metricsCollector")
def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration

@JsonProperty("metricDefinitionDB")
def getMetricDefinitionDBConfiguration: MetricDefinitionDBConfiguration = metricDefinitionDBConfiguration
}
@@ -17,13 +17,14 @@
*/
package org.apache.ambari.metrics.adservice.app

import org.apache.ambari.metrics.adservice.db.MetadataDatasource
import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource}
import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl}

import com.codahale.metrics.health.HealthCheck
import com.google.inject.AbstractModule
import com.google.inject.multibindings.Multibinder

import io.dropwizard.setup.Environment

class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule {
@@ -35,5 +36,6 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
bind(classOf[AnomalyResource])
bind(classOf[RootResource])
bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
}
}
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.configuration

import javax.validation.constraints.NotNull

import com.fasterxml.jackson.annotation.JsonProperty

class MetricDefinitionDBConfiguration {

@NotNull
private var dbDirPath: String = _

@JsonProperty("verifyChecksums")
def verifyChecksums: Boolean = true

@JsonProperty("performParanoidChecks")
def performParanoidChecks: Boolean = false

@JsonProperty("dbDirPath")
def getDbDirPath: String = dbDirPath
}
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.db

trait MetadataDatasource {

type Key = Array[Byte]
type Value = Array[Byte]

/**
* Idempotent call at the start of the application to initialize db
*/
def initialize(): Unit

/**
* This function obtains the associated value to a key. It requires the (key-value) pair to be in the DataSource
*
* @param key
* @return the value associated with the passed key.
*/
def apply(key: Key): Value = get(key).get

/**
* This function obtains the associated value to a key, if there exists one.
*
* @param key
* @return the value associated with the passed key.
*/
def get(key: Key): Option[Value]


/**
* This function associates a key to a value, overwriting if necessary
*/
def put(key: Key, value: Value): Unit

/**
* Delete key from the db
*/
def delete(key: Key): Unit

/**
* This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
*
* @param toRemove which includes all the keys to be removed from the DataSource.
* @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
* If a key is already in the DataSource its value will be updated.
* @return the new DataSource after the removals and insertions were done.
*/
def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit

/**
* This function closes the DataSource, without deleting the files used by it.
*/
def close(): Unit

}
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.leveldb

import java.io.File

import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
import org.apache.ambari.metrics.adservice.db.MetadataDatasource
import org.iq80.leveldb.{DB, Options, WriteOptions}
import org.iq80.leveldb.impl.Iq80DBFactory

import com.google.inject.Singleton

@Singleton
class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource {

private var db: DB = _
@volatile var isInitialized: Boolean = false

override def initialize(): Unit = {
if (isInitialized) return

val configuration: MetricDefinitionDBConfiguration = appConfig.getMetricDefinitionDBConfiguration

db = createDB(new LevelDbConfig {
override val createIfMissing: Boolean = true
override val verifyChecksums: Boolean = configuration.verifyChecksums
override val paranoidChecks: Boolean = configuration.performParanoidChecks
override val path: String = configuration.getDbDirPath
})
isInitialized = true
}

private def createDB(levelDbConfig: LevelDbConfig): DB = {
import levelDbConfig._

val options = new Options()
.createIfMissing(createIfMissing)
.paranoidChecks(paranoidChecks) // raise an error as soon as it detects an internal corruption
.verifyChecksums(verifyChecksums) // force checksum verification of all data that is read from the file system on behalf of a particular read

Iq80DBFactory.factory.open(new File(path), options)
}

override def close(): Unit = {
db.close()
}

/**
* This function obtains the associated value to a key, if there exists one.
*
* @param key
* @return the value associated with the passed key.
*/
override def get(key: Key): Option[Value] = Option(db.get(key))

/**
* This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
*
* @param toRemove which includes all the keys to be removed from the DataSource.
* @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
* If a key is already in the DataSource its value will be updated.
*/
override def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit = {
val batch = db.createWriteBatch()
toRemove.foreach { key => batch.delete(key) }
toUpsert.foreach { item => batch.put(item._1, item._2) }
db.write(batch, new WriteOptions())
}

override def put(key: Key, value: Value): Unit = {
db.put(key, value)
}

override def delete(key: Key): Unit = {
db.delete(key)
}
}

trait LevelDbConfig {
val createIfMissing: Boolean
val paranoidChecks: Boolean
val verifyChecksums: Boolean
val path: String
}
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.leveldb

import java.io.File

import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
import org.iq80.leveldb.util.FileUtils
import org.mockito.Mockito.when
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.mockito.MockitoSugar

class LevelDBDataSourceTest extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar {

var db: LevelDBDataSource = _
var file : File = FileUtils.createTempDir("adservice-leveldb-test")

before {
val appConfig: AnomalyDetectionAppConfig = mock[AnomalyDetectionAppConfig]
val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration]

when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig)
when(mdConfig.verifyChecksums).thenReturn(true)
when(mdConfig.performParanoidChecks).thenReturn(false)
when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath)

db = new LevelDBDataSource(appConfig)
db.initialize()
}

test("testOperations") {
db.put("Hello".getBytes(), "World".getBytes())
assert(db.get("Hello".getBytes()).get.sameElements("World".getBytes()))
db.update(Seq("Hello".getBytes()), Seq(("Hello".getBytes(), "Mars".getBytes())))
assert(db.get("Hello".getBytes()).get.sameElements("Mars".getBytes()))
}

after {
FileUtils.deleteRecursively(file)
}
}

0 comments on commit b866169

Please sign in to comment.