Skip to content
Permalink
Browse files
AMBARI-22567 : Integrate Spark lifecycle management into AMS AD Manag…
…er. (avijayan)
  • Loading branch information
Aravindan Vijayan committed Apr 1, 2018
1 parent 38defd5 commit 61f08113a0a43a7f80ef123690a29a88dce8e7e3
Showing 23 changed files with 469 additions and 76 deletions.
@@ -14,13 +14,44 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific

PIDFILE=/var/run//var/run/ambari-metrics-anomaly-detection/ambari-metrics-admanager.pid
PIDFILE=/var/run/ambari-metrics-anomaly-detection/ambari-metrics-admanager.pid
OUTFILE=/var/log/ambari-metrics-anomaly-detection/ambari-metrics-admanager.out

CONF_DIR=/etc/ambari-metrics-anomaly-detection/conf
DAEMON_NAME=ams_admanager
SPARK_HOME=/usr/lib/ambari-metrics-anomaly-detection/spark

STOP_TIMEOUT=5
SPARK_MASTER_PID=/var/run/ambari-metrics-anomaly-detection/spark-ams-org.apache.spark.deploy.master.Master.pid

STOP_TIMEOUT=10

function spark_daemon
{
local cmd=$1
local pid

if [[ "${cmd}" == "start" ]]
then

${SPARK_HOME}/sbin/start-master.sh
sleep 2
master_pid=$(cat "$SPARK_MASTER_PID")
if [ -z "`ps ax | grep -w ${master_pid} | grep org.apache.spark.deploy.master.Master`" ]; then
echo "ERROR: Spark Master start failed. For more details, see outfile in log directory."
exit -1
fi

${SPARK_HOME}/sbin/start-slave.sh spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}
elif [[ "${cmd}" == "stop" ]]
then
${SPARK_HOME}/sbin/stop-slave.sh
${SPARK_HOME}/sbin/stop-master.sh
else
pid=${SPARK_MASTER_PID}
daemon_status "${pid}"
fi

}

function write_pidfile
{
@@ -55,22 +86,6 @@ function java_setup

function daemon_status()
{
#
# LSB 4.1.0 compatible status command (1)
#
# 0 = program is running
# 1 = dead, but still a pid (2)
# 2 = (not used by us)
# 3 = not running
#
# 1 - this is not an endorsement of the LSB
#
# 2 - technically, the specification says /var/run/pid, so
# we should never return this value, but we're giving
# them the benefit of a doubt and returning 1 even if
# our pid is not in in /var/run .
#

local pidfile="$1"
shift

@@ -90,6 +105,12 @@ function start()
{
java_setup


if [[ "${AMS_AD_STANDALONE_SPARK_ENABLED}" == "true" || "${AMS_AD_STANDALONE_SPARK_ENABLED}" == "True" ]]
then
spark_daemon "start"
fi

daemon_status "${PIDFILE}"
if [[ $? == 0 ]]; then
echo "AMS AD Manager is running as process $(cat "${PIDFILE}"). Exiting" | tee -a $STARTUPFILE
@@ -144,22 +165,34 @@ function stop()
rm -f "${pidfile}" >/dev/null 2>&1
fi
fi

#Let's try to stop spark always since if the user has flipped the spark mode to 'yarn', the enabled flag becomes obsolete.
spark_daemon "stop"
}

# execute ams-env.sh
# execute ams-admanager-env.sh
if [[ -f "${CONF_DIR}/ams-admanager-env.sh" ]]; then
. "${CONF_DIR}/ams-admanager-env.sh"
else
echo "ERROR: Cannot execute ${CONF_DIR}/ams-admanager-env.sh." 2>&1
exit 1
fi

# set these env variables only if they were not set by ams-env.sh
if [[ -f "${CONF_DIR}/ams-admanager-spark-env.sh" ]]; then
. "${CONF_DIR}/ams-admanager-spark-env.sh"
else
echo "ERROR: Cannot execute ${CONF_DIR}/ams-admanager-spark-env.sh." 2>&1
exit 1
fi

# set these env variables only if they were not set by ams-admanager-env.sh
: ${AMS_AD_LOG_DIR:=/var/log/ambari-metrics-anomaly-detection}
: ${AMS_AD_STANDALONE_SPARK_ENABLED:=true}

# set pid dir path
if [[ -n "${AMS_AD_PID_DIR}" ]]; then
PIDFILE=${AMS_AD_PID_DIR}/admanager.pid
PIDFILE=${AMS_AD_PID_DIR}/ambari-metrics-admanager.pid
SPARK_MASTER_PID=${AMS_AD_PID_DIR}/spark-${USER}-org.apache.spark.deploy.master.Master-1.pid
fi

# set out file path
@@ -38,9 +38,8 @@ metricDefinitionDB:
# raise an error as soon as it detects an internal corruption
performParanoidChecks: false
# Path to Level DB directory
dbDirPath: /var/lib/ambari-metrics-anomaly-detection/
dbDirPath: /tmp/ambari-metrics-anomaly-detection/db

#subsystemService:
# spark:
# pointInTime:
# trend:
spark:
mode: standalone
masterHostPort: localhost:7077
@@ -36,7 +36,7 @@
<scala.binary.version>2.11</scala.binary.version>
<jackson.version>2.9.1</jackson.version>
<dropwizard.version>1.2.0</dropwizard.version>
<spark.version>2.2.0</spark.version>
<spark.version>2.1.1</spark.version>
<hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
<hbase.version>1.1.2.2.6.0.3-8</hbase.version>
<phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
@@ -59,7 +59,7 @@
</pluginRepositories>

<build>
<finalName>${project.artifactId}</finalName>
<finalName>${project.artifactId}-${project.version}</finalName>
<resources>
<resource>
<filtering>true</filtering>
@@ -156,14 +156,6 @@
<exclude>com/google/common/collect/**</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.phoenix:phoenix-core</artifact>
<excludes>
<exclude>org/joda/time/**</exclude>
<exclude>com/codahale/metrics/**</exclude>
<exclude>com/google/common/collect/**</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
@@ -191,6 +183,38 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target name="Download Spark">
<mkdir dir="${project.build.directory}/embedded"/>
<get
src="${spark.tar}"
dest="${project.build.directory}/embedded/spark.tar.gz"
usetimestamp="true"
/>
<untar
src="${project.build.directory}/embedded/spark.tar.gz"
dest="${project.build.directory}/embedded"
compression="gzip"
/>
<move
todir="${project.build.directory}/embedded/spark" >
<fileset dir="${project.build.directory}/embedded/${spark.folder}" includes="**"/>
</move>
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

@@ -0,0 +1,21 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly>
<id>empty</id>
<formats/>
</assembly>
@@ -53,6 +53,12 @@ class AnomalyDetectionAppConfig extends Configuration {
@Valid
private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration

/**
* Spark configurations
*/
@Valid
private val sparkConfiguration = new SparkConfiguration

/*
AMS HBase Conf
*/
@@ -76,4 +82,8 @@ class AnomalyDetectionAppConfig extends Configuration {

@JsonProperty("metricDefinitionDB")
def getMetricDefinitionDBConfiguration: MetricDefinitionDBConfiguration = metricDefinitionDBConfiguration

@JsonProperty("spark")
def getSparkConfiguration: SparkConfiguration = sparkConfiguration

}
@@ -17,16 +17,13 @@

package org.apache.ambari.metrics.adservice.configuration

import javax.validation.constraints.NotNull

import com.fasterxml.jackson.annotation.JsonProperty

/**
* Class to capture the Metric Definition Service configuration.
*/
class MetricDefinitionServiceConfiguration {

@NotNull
private val inputDefinitionDirectory: String = ""

@JsonProperty
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.configuration

import javax.validation.constraints.NotNull

import com.fasterxml.jackson.annotation.JsonProperty

class SparkConfiguration {

@NotNull
private var mode: String = _

@NotNull
private var masterHostPort: String = _

@JsonProperty
def getMode: String = mode

@JsonProperty
def getMasterHostPort: String = masterHostPort

}
@@ -21,14 +21,11 @@ import java.sql.{Connection, PreparedStatement, ResultSet, SQLException}
import java.util.concurrent.TimeUnit.SECONDS

import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.common._
import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey}
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, MetricAnomalyInstance}
import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance
import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance
import org.apache.ambari.metrics.adservice.model._
import org.apache.hadoop.hbase.util.RetryCounterFactory
import org.slf4j.{Logger, LoggerFactory}

@@ -15,15 +15,13 @@
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.subsystem.pointintime
package org.apache.ambari.metrics.adservice.model

import java.util.Date

import org.apache.ambari.metrics.adservice.common.Season
import org.apache.ambari.metrics.adservice.metadata.MetricKey
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance}

class PointInTimeAnomalyInstance(val metricKey: MetricKey,
val timestamp: Long,
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.common
package org.apache.ambari.metrics.adservice.model

/**
* Class to capture a Range in a Season.
@@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.common
package org.apache.ambari.metrics.adservice.model

import java.time.DayOfWeek
import java.util.Calendar

import javax.xml.bind.annotation.XmlRootElement

import org.apache.ambari.metrics.adservice.common.SeasonType.SeasonType
import org.apache.ambari.metrics.adservice.model.SeasonType.SeasonType

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.common
package org.apache.ambari.metrics.adservice.model

object SeasonType extends Enumeration{

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ambari.metrics.adservice.common
package org.apache.ambari.metrics.adservice.model

import java.util.Date

0 comments on commit 61f0811

Please sign in to comment.