Skip to content

Commit

Permalink
New spline.pluginsEnabledByDefault property
Browse files Browse the repository at this point in the history
  • Loading branch information
martinf-moodys committed Oct 24, 2023
1 parent 42ec158 commit 4df51ed
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 29 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,13 @@ When one of these commands occurs spline will let you know by logging a warning.
### Plugin API

Using a plugin API you can capture lineage from a 3rd party data source provider.
Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
By default, Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin.
All you need is to create a class extending the `za.co.absa.spline.harvester.plugin.Plugin` marker trait
mixed with one or more `*Processing` traits, depending on your intention.

To disable automatic plugin discovery and speed up initialization, set `spline.pluginsEnabledByDefault` to `false` in your configuration file.
Then, you will need to register all necessary plugins one by one, using `spline.plugins` configuration property.

There are three general processing traits:

- `DataSourceFormatNameResolving` - returns a name of a data provider/format in use.
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,6 @@ spline:
- collect
- collectAsList
- toLocalIterator

# Should plugins be enabled by default.
pluginsEnabledByDefault: true # true | false
10 changes: 7 additions & 3 deletions core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}

import scala.collection.JavaConverters._
Expand All @@ -37,7 +38,7 @@ private[spline] trait AgentBOM {
def lineageDispatcher: LineageDispatcher
def iwdStrategy: IgnoredWriteDetectionStrategy
def execPlanUUIDVersion: UUIDVersion
def pluginsConfig: Configuration
def pluginsConfig: PluginsConfiguration
}

object AgentBOM {
Expand All @@ -62,8 +63,11 @@ object AgentBOM {
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
}

override def pluginsConfig: Configuration = {
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
override def pluginsConfig: PluginsConfiguration = {
PluginsConfiguration(
mergedConfig.getRequiredBoolean(ConfProperty.PluginsEnabledByDefault),
mergedConfig.subset(ConfProperty.PluginsConfigNamespace)
)
}

override lazy val postProcessingFilter: Option[PostProcessingFilter] = {
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ object AgentConfig {
this
}

def pluginsEnabledByDefault(enabled: Boolean): this.type = synchronized {
options += ConfProperty.PluginsEnabledByDefault -> enabled
this
}

def enablePlugin(name: String): this.type = synchronized {
options += s"${ConfProperty.PluginsConfigNamespace}.$name.enabled" -> true
this
}

def build(): AgentConfig = new AgentConfig {
options.foreach(tupled(addProperty))
}
Expand Down Expand Up @@ -123,6 +133,11 @@ object AgentConfig {
*/
val IgnoreWriteDetectionStrategy = "spline.IWDStrategy"

/**
* Should plugins be enabled by default
*/
val PluginsEnabledByDefault = "spline.pluginsEnabledByDefault"

val PluginsConfigNamespace = "spline.plugins"

def dispatcherClassName(logicalName: String): String = s"$RootLineageDispatcher.$logicalName.${HierarchicalObjectFactory.ClassName}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor
import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter}
import za.co.absa.spline.harvester.dispatcher.LineageDispatcher
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry
import za.co.absa.spline.harvester.postprocessing._
import za.co.absa.spline.harvester.qualifier.HDFSPathQualifier
Expand All @@ -54,7 +55,7 @@ object SplineAgent extends Logging {
)

def create(
pluginsConfig: Configuration,
pluginsConfig: PluginsConfiguration,
session: SparkSession,
lineageDispatcher: LineageDispatcher,
userPostProcessingFilter: Option[PostProcessingFilter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class PluggableDataSourceFormatResolver(pluginRegistry: PluginRegistry) extends
private val processFn =
pluginRegistry.plugins[DataSourceFormatNameResolving]
.map(_.formatNameResolver)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)
.orElse[AnyRef, String] {
case dsr: DataSourceRegister => dsr.shortName
case o => o.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ class PluggableReadCommandExtractor(
private val planProcessFn =
pluginRegistry.plugins[ReadNodeProcessing]
.map(_.readNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

private val rddProcessFn =
pluginRegistry.plugins[RddReadNodeProcessing]
.map(_.rddReadNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override def asReadCommand(planOrRdd: PlanOrRdd): Option[ReadCommand] = {
val res = planOrRdd match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class PluggableWriteCommandExtractor(
private val processFn: ((FuncName, LogicalPlan)) => Option[WriteNodeInfo] =
pluginRegistry.plugins[WriteNodeProcessing]
.map(_.writeNodeProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)
.lift

def asWriteCommand(funcName: FuncName, logicalPlan: LogicalPlan): Option[WriteCommand] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.harvester.plugin

import org.apache.commons.configuration.Configuration

case class PluginsConfiguration(
pluginsEnabledByDefault: Boolean,
plugins: Configuration
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class LogicalRelationPlugin(pluginRegistry: PluginRegistry) extends Plugin with
private lazy val baseRelProcessor =
pluginRegistry.plugins[BaseRelationProcessing]
.map(_.baseRelationProcessor)
.reduce(_ orElse _)
.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override val readNodeProcessor: PartialFunction[LogicalPlan, ReadNodeInfo] = {
case lr: LogicalRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class SaveIntoDataSourceCommandPlugin(
private lazy val rpProcessor =
pluginRegistry.plugins[RelationProviderProcessing]
.map(_.relationProviderProcessor)
.reduce(_ orElse _)

.reduceOption(_ orElse _)
.getOrElse(PartialFunction.empty)

override def writeNodeProcessor: PartialFunction[(FuncName, LogicalPlan), WriteNodeInfo] = {
case (_, cmd: SaveIntoDataSourceCommand) => cmd match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.spark.internal.Logging
import za.co.absa.spline.commons.lang.ARM
import za.co.absa.spline.harvester.plugin.Plugin
import za.co.absa.spline.harvester.plugin.Plugin.Precedence
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledByDefault, EnabledConfProperty, PluginClasses, getOnlyOrThrow}
import za.co.absa.spline.harvester.plugin.PluginsConfiguration
import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledConfProperty, PluginClasses, getOnlyOrThrow}

import javax.annotation.Priority
import scala.collection.JavaConverters._
Expand All @@ -32,7 +33,7 @@ import scala.util.Try
import scala.util.control.NonFatal

class AutoDiscoveryPluginRegistry(
conf: Configuration,
conf: PluginsConfiguration,
injectables: AnyRef*
) extends PluginRegistry
with Logging {
Expand All @@ -47,13 +48,29 @@ class AutoDiscoveryPluginRegistry(
typedInjectables.groupBy(_._1).mapValues(_.map(_._2))
}

private val allPlugins: Seq[Plugin] =
for (pc <- PluginClasses if isPluginEnabled(pc)) yield {
private val allPlugins: Seq[Plugin] = {
val enabledPlugins = if(conf.pluginsEnabledByDefault) {
PluginClasses.filter(pc => isPluginEnabled(pc.getName))
} else {
conf.plugins.getKeys.asScala
.filter(_.endsWith(s".$EnabledConfProperty")) // Looking for keys ending with ".enabled", since plugins must be explicitly enabled
.map(_.dropRight(EnabledConfProperty.length + 1)) // Dropping ".enabled" to get plugin class name
.filter(isPluginEnabled)
.map(Class.forName)
.toSeq
}

if(enabledPlugins.isEmpty) {
throw new RuntimeException("No plugins enabled")
}

for (pc <- enabledPlugins) yield {
logInfo(s"Loading plugin: $pc")
instantiatePlugin(pc)
.recover({ case NonFatal(e) => throw new RuntimeException(s"Plugin instantiation failure: $pc", e) })
.get
}
}

override def plugins[A: ClassTag]: Seq[Plugin with A] = {
val ct = implicitly[ClassTag[A]]
Expand All @@ -65,19 +82,19 @@ class AutoDiscoveryPluginRegistry(
val constr = getOnlyOrThrow(constrs, s"Plugin class must have a single public constructor: ${constrs.mkString(", ")}")
val args = constr.getParameterTypes.map {
case ct if classOf[Configuration].isAssignableFrom(ct) =>
conf.subset(pluginClass.getName)
conf.plugins.subset(pluginClass.getName)
case pt =>
val candidates = injectablesByType.getOrElse(pt, sys.error(s"Cannot bind $pt. No value found"))
getOnlyOrThrow(candidates, s"Ambiguous constructor parameter binding. Multiple values found for $pt: ${candidates.length}")
}
constr.newInstance(args: _*).asInstanceOf[Plugin]
}

private def isPluginEnabled(pc: Class[Plugin]): Boolean = {
val pluginConf = conf.subset(pc.getName)
val isEnabled = pluginConf.getBoolean(EnabledConfProperty, EnabledByDefault)
private def isPluginEnabled(pcn: String): Boolean = {
val pluginConf = conf.plugins.subset(pcn)
val isEnabled = pluginConf.getBoolean(EnabledConfProperty, conf.pluginsEnabledByDefault)
if (!isEnabled) {
logWarning(s"Plugin ${pc.getName} is disabled in the configuration.")
logWarning(s"Plugin ${pcn} is disabled in the configuration.")
}
isEnabled
}
Expand All @@ -87,7 +104,6 @@ class AutoDiscoveryPluginRegistry(
object AutoDiscoveryPluginRegistry extends Logging {

private val EnabledConfProperty = "enabled"
private val EnabledByDefault = true

private val PluginClasses: Seq[Class[Plugin]] = {
logDebug("Scanning for plugins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ class AgentConfigSpec
.lineageDispatcher(mockDispatcher)
.postProcessingFilter(mockFilter)
.ignoredWriteDetectionStrategy(mockIwdStrategy)
.pluginsEnabledByDefault(false)
.build()

config should not be empty
config.getKeys.asScala should have length 3
config.getKeys.asScala should have length 4
config.getProperty(ConfProperty.RootLineageDispatcher) should be theSameInstanceAs mockDispatcher
config.getProperty(ConfProperty.RootPostProcessingFilter) should be theSameInstanceAs mockFilter
config.getProperty(ConfProperty.IgnoreWriteDetectionStrategy) should be theSameInstanceAs mockIwdStrategy
config.getProperty(ConfProperty.PluginsEnabledByDefault) shouldBe false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
package za.co.absa.spline.test.fixture.spline

import org.apache.spark.sql.SparkSession
import za.co.absa.spline.agent.AgentConfig


trait SplineFixture {
def withLineageTracking[T](testBody: LineageCaptor => T)(implicit session: SparkSession): T = {
testBody(new LineageCaptor)
def withLineageTracking[T](testBody: LineageCaptor => T, builderCustomizer: AgentConfig.Builder => AgentConfig.Builder = identity)(implicit session: SparkSession): T = {
testBody(new LineageCaptor(builderCustomizer))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class BasicIntegrationTests extends AsyncFlatSpec

"saveAsTable" should "process all operations" in
withNewSparkSession(implicit spark =>
withLineageTracking { captor =>
withLineageTracking({ captor =>
import spark.implicits._

withNewSparkSession {
Expand All @@ -52,12 +52,16 @@ class BasicIntegrationTests extends AsyncFlatSpec
plan.operations.other should have length 2
plan.operations.write should not be null
}
}
},{
// To enable the SQL plugin only
_.pluginsEnabledByDefault(false)
.enablePlugin("za.co.absa.spline.harvester.plugin.embedded.SQLPlugin")
})
)

"save_to_fs" should "process all operations" in
withNewSparkSession(implicit spark =>
withLineageTracking { captor =>
withLineageTracking({ captor =>
import spark.implicits._
val path = TempDirectory("spline_", ".parquet", pathOnly = true).deleteOnExit().path.toString

Expand All @@ -71,7 +75,11 @@ class BasicIntegrationTests extends AsyncFlatSpec
plan.operations.other should have length 2
plan.operations.write should not be null
}
}
},{
// To enable the SQL plugin only
_.pluginsEnabledByDefault(false)
.enablePlugin("za.co.absa.spline.harvester.plugin.embedded.SQLPlugin")
})
)

"saveAsTable" should "produce URIs compatible with filesystem write" in
Expand Down

0 comments on commit 4df51ed

Please sign in to comment.