Skip to content

Commit

Permalink
spline #328 Add template resource files to runable modules
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Dec 31, 2019
1 parent c74accc commit 10f2a08
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 84 deletions.
2 changes: 1 addition & 1 deletion client-web/src/main/webapp/META-INF/context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<Context>

<!--
Spline REST Client API URL.
Spline Consumer REST API endpoint URL.
-->
<Environment name="spline/consumer/url" type="java.lang.String" override="false"/>

Expand Down
18 changes: 10 additions & 8 deletions examples/src/main/resources/spline.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
# limitations under the License.
#
#
# Spline properties placeholder.
# Uncomment the following lines to override corresponding Hadoop environment configuration properties.
# ===========================================================================
# THIS FILE OVERRIDES VALUES FROM spline.default.properties
# ===========================================================================
# You can override these properties further in the following config sources,
# that would take precedence (in the corresponding order) over this file:
# - Hadoop config
# - Spark config (must be prefixed with 'spark.')
# - JVM options
# ===========================================================================
#
#
# Spline mode (default is BEST_EFFORT)
#
# spline.mode=DISABLED|REQUIRED|BEST_EFFORT
#
#
# spline.producer.url=http://localhost:8080/spline

# spline.producer.url=http://localhost:8080/spline/producer
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import za.co.absa.spline.common.Version.VersionOrdering._
import za.co.absa.spline.common.Version._
import za.co.absa.spline.harvester.SparkLineageInitializer._
import za.co.absa.spline.harvester.SparkLineageInitializerSpec._
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer
import za.co.absa.spline.harvester.conf.DefaultSplineConfigurer.ConfProperty._
import za.co.absa.spline.harvester.conf.SplineConfigurer.SplineMode._
import za.co.absa.spline.harvester.conf.{DefaultSplineConfiguration, DefaultSplineConfigurer}
import za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher.producerUrlProperty
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
import za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
Expand Down Expand Up @@ -108,7 +108,7 @@ class SparkLineageInitializerSpec extends AnyFunSpec with BeforeAndAfterEach wit
// skip setting spline prop as it's already hardcoded in spline.properties
}

val splineConfiguration = sparkSession.defaultSplineConfiguration
val splineConfiguration = new DefaultSplineConfiguration(sparkSession)

splineConfiguration getString keyDefinedEverywhere shouldEqual valueFromHadoop
splineConfiguration getString keyDefinedInJVMAndSpline shouldEqual valueFromJVM
Expand Down
10 changes: 10 additions & 0 deletions rest-gateway/src/main/webapp/META-INF/context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,14 @@
-->
<Environment name="spline/database/connectionUrl" type="java.lang.String" override="false"/>

<!--
REST adaptive timeout config.
It's not actually used at the moment. In the future, it will either be used or deprecated.
See:
- https://github.com/AbsaOSS/spline/issues/474
- https://github.com/AbsaOSS/spline/issues/485
-->
<Environment name="spline/adaptive_timeout/min" type="java.lang.Long" override="false"/>
<Environment name="spline/adaptive_timeout/duration_factor" type="java.lang.Double" override="false"/>

</Context>
26 changes: 26 additions & 0 deletions spark/agent/src/main/resources/spline.default.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright 2017 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ===================================================================================================
# THIS FILE CONTAINS DEFAULT SPLINE SPARK AGENT PROPERTIES.
# ===================================================================================================
#
# Spline mode: DISABLED|REQUIRED|BEST_EFFORT
spline.mode=BEST_EFFORT
# Fully specified class name implementing {LineageDispatcher} trait.
# The class must have a constructor with a single parameter of type {org.apache.commons.configuration.Configuration}
spline.lineage_dispatcher.className=za.co.absa.spline.harvester.dispatcher.HttpLineageDispatcher
# Required for {HttpLineageDispatcher}. Base URL of the Spline Producer REST API endpoint
spline.producer.url=
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@

package za.co.absa.spline.harvester

import org.apache.commons.configuration._
import org.apache.spark
import org.apache.spark.sql.SparkSession
import org.slf4s.Logging
import za.co.absa.spline.common.SplineBuildInfo
import za.co.absa.spline.harvester.conf.SplineConfigurer.SplineMode._
import za.co.absa.spline.harvester.conf.{DefaultSplineConfigurer, HadoopConfiguration, SparkConfiguration, SplineConfigurer}
import za.co.absa.spline.harvester.conf.{DefaultSplineConfiguration, DefaultSplineConfigurer, SplineConfigurer}
import za.co.absa.spline.harvester.listener.SplineQueryExecutionListener

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand All @@ -44,16 +41,11 @@ object SparkLineageInitializer extends Logging {
def createEventHandler(sparkSession: SparkSession): Option[QueryExecutionEventHandler] =
SparkSessionWrapper(sparkSession).createEventHandler()

/**
* The class is a wrapper around Spark session and performs all necessary registrations and procedures for initialization of the library.
*
* @param sparkSession A Spark session
*/
implicit class SparkSessionWrapper(sparkSession: SparkSession) {

private implicit val executionContext: ExecutionContext = ExecutionContext.global

private def defaultSplineConfigurer = new DefaultSplineConfigurer(defaultSplineConfiguration, sparkSession)
private def defaultSplineConfigurer = new DefaultSplineConfigurer(new DefaultSplineConfiguration(sparkSession), sparkSession)

/**
* The method performs all necessary registrations and procedures for initialization of the library.
Expand Down Expand Up @@ -94,7 +86,7 @@ object SparkLineageInitializer extends Logging {
}

def createEventHandler(): Option[QueryExecutionEventHandler] = {
val configurer = new DefaultSplineConfigurer(defaultSplineConfiguration, sparkSession)
val configurer = defaultSplineConfigurer
if (configurer.splineMode != DISABLED) {
createEventHandler(configurer)
} else {
Expand Down Expand Up @@ -124,23 +116,6 @@ object SparkLineageInitializer extends Logging {
}
}

private[harvester] val defaultSplineConfiguration = {
val splinePropertiesFileName = "spline.properties"

val systemConfOpt = Some(new SystemConfiguration)
val propFileConfOpt = Try(new PropertiesConfiguration(splinePropertiesFileName)).toOption
val hadoopConfOpt = Some(new HadoopConfiguration(sparkSession.sparkContext.hadoopConfiguration))
val sparkConfOpt = Some(new SparkConfiguration(sparkSession.sparkContext.getConf))

new CompositeConfiguration(Seq(
hadoopConfOpt,
sparkConfOpt,
systemConfOpt,
propFileConfOpt
).flatten.asJava)

}

private def getOrSetIsInitialized(): Boolean = sparkSession.synchronized {
val sessionConf = sparkSession.conf
sessionConf getOption initFlagKey match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.harvester.conf

import org.apache.commons.configuration.{CompositeConfiguration, PropertiesConfiguration, SystemConfiguration}
import org.apache.spark.sql.SparkSession
import za.co.absa.spline.harvester.conf.DefaultSplineConfiguration._

import scala.collection.JavaConverters._
import scala.util.Try

object DefaultSplineConfiguration {
private val propertiesFileName = "spline.properties"
}

class DefaultSplineConfiguration(sparkSession: SparkSession)
extends CompositeConfiguration(Seq(
Some(new HadoopConfiguration(sparkSession.sparkContext.hadoopConfiguration)),
Some(new SparkConfiguration(sparkSession.sparkContext.getConf)),
Some(new SystemConfiguration),
Try(new PropertiesConfiguration(propertiesFileName)).toOption
).flatten.asJava)
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@

package za.co.absa.spline.harvester.conf

import org.apache.commons.configuration.Configuration
import org.apache.commons.configuration.{CompositeConfiguration, Configuration, PropertiesConfiguration}
import org.apache.spark.sql.SparkSession
import org.slf4s.Logging
import za.co.absa.spline.common.ConfigurationImplicits
import za.co.absa.spline.harvester.conf.SplineConfigurer.SplineMode
import za.co.absa.spline.harvester.conf.SplineConfigurer.SplineMode._
import za.co.absa.spline.harvester.dispatcher.{HttpLineageDispatcher, LineageDispatcher}
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
import za.co.absa.spline.harvester.{LineageHarvesterFactory, QueryExecutionEventHandler}

import scala.concurrent.ExecutionContext

/**
* The object contains static information about default settings needed for initialization of the library.
*/
object DefaultSplineConfigurer {
private val defaultPropertiesFileName = "spline.default.properties"

//noinspection TypeAnnotation
object ConfProperty {

/**
Expand All @@ -40,29 +35,30 @@ object DefaultSplineConfigurer {
* @see [[SplineMode]]
*/
val MODE = "spline.mode"
val MODE_DEFAULT = BEST_EFFORT.toString

/**
* Which lineage dispatcher should be used to report lineages (to Spline or elsewhere)
*/
* Which lineage dispatcher should be used to report lineages
*/
val LINEAGE_DISPATCHER_CLASS = "spline.lineage_dispatcher.className"
val LINEAGE_DISPATCHER_CLASS_DEFAULT = classOf[HttpLineageDispatcher].getCanonicalName
}
}

/**
* The class represents default settings needed for initialization of the library.
*
* @param configuration A source of settings
*/
class DefaultSplineConfigurer(configuration: Configuration, sparkSession: SparkSession) extends SplineConfigurer with Logging {
class DefaultSplineConfigurer(userConfiguration: Configuration, sparkSession: SparkSession) extends SplineConfigurer with Logging {

import ConfigurationImplicits._
import DefaultSplineConfigurer.ConfProperty._
import DefaultSplineConfigurer._
import SplineMode._

private implicit val executionContext: ExecutionContext = ExecutionContext.global
import collection.JavaConverters._

private lazy val configuration = new CompositeConfiguration(Seq(
userConfiguration,
new PropertiesConfiguration(defaultPropertiesFileName)
).asJava)

lazy val splineMode: SplineMode = {
val modeName = configuration.getString(MODE, MODE_DEFAULT)
val modeName = configuration.getRequiredString(MODE)
try SplineMode withName modeName
catch {
case _: NoSuchElementException => throw new IllegalArgumentException(
Expand All @@ -71,15 +67,12 @@ class DefaultSplineConfigurer(configuration: Configuration, sparkSession: SparkS
}

override lazy val lineageDispatcher: LineageDispatcher = {
configuration.getString(LINEAGE_DISPATCHER_CLASS, LINEAGE_DISPATCHER_CLASS_DEFAULT) match {
case LINEAGE_DISPATCHER_CLASS_DEFAULT => HttpLineageDispatcher(configuration)
case className: String =>
log debug s"Instantiating a lineage dispatcher for class name: $className"
Class.forName(className.trim)
.getConstructor(classOf[Configuration])
.newInstance(configuration)
.asInstanceOf[LineageDispatcher]
}
val className = configuration.getRequiredString(LINEAGE_DISPATCHER_CLASS)
log debug s"Instantiating a lineage dispatcher for class name: $className"
Class.forName(className.trim)
.getConstructor(classOf[Configuration])
.newInstance(configuration)
.asInstanceOf[LineageDispatcher]
}

private lazy val harvesterFactory = new LineageHarvesterFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,22 @@ import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

object HttpLineageDispatcher {
val producerUrlProperty = "spline.producer.url"

object RESTResource {
val ExecutionPlans = "execution-plans"
val ExecutionEvents = "execution-events"
val Status = "status"
}
}

class HttpLineageDispatcher(splineServerRESTEndpointBaseURL: String, http: BaseHttp)
extends LineageDispatcher
with Logging {

def this(configuration: Configuration) = this(configuration.getRequiredString(HttpLineageDispatcher.producerUrlProperty), Http)

val executionPlansUrl = s"$splineServerRESTEndpointBaseURL/${RESTResource.ExecutionPlans}"
val executionEventsUrl = s"$splineServerRESTEndpointBaseURL/${RESTResource.ExecutionEvents}"
val statusUrl = s"$splineServerRESTEndpointBaseURL/${RESTResource.Status}"
Expand Down Expand Up @@ -72,18 +84,3 @@ class HttpLineageDispatcher(splineServerRESTEndpointBaseURL: String, http: BaseH
}
}
}


object HttpLineageDispatcher {
val producerUrlProperty = "spline.producer.url"

object RESTResource {
val ExecutionPlans = "execution-plans"
val ExecutionEvents = "execution-events"
val Status = "status"
}

def apply(configuration: Configuration): LineageDispatcher = {
new HttpLineageDispatcher(configuration.getRequiredString(producerUrlProperty), Http)
}
}

0 comments on commit 10f2a08

Please sign in to comment.