From 4df51edf2c5617b0c3a8f7b4258911e6ebe54c91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20MARTIN?= <55987818+martinf-moodys@users.noreply.github.com> Date: Tue, 24 Oct 2023 10:41:03 +0200 Subject: [PATCH] New spline.pluginsEnabledByDefault property --- README.md | 5 ++- core/src/main/resources/spline.default.yaml | 3 ++ .../za/co/absa/spline/agent/AgentBOM.scala | 10 ++++-- .../za/co/absa/spline/agent/AgentConfig.scala | 15 ++++++++ .../za/co/absa/spline/agent/SplineAgent.scala | 3 +- .../PluggableDataSourceFormatResolver.scala | 3 +- .../read/PluggableReadCommandExtractor.scala | 6 ++-- .../PluggableWriteCommandExtractor.scala | 3 +- .../plugin/PluginsConfiguration.scala | 24 +++++++++++++ .../composite/LogicalRelationPlugin.scala | 3 +- .../SaveIntoDataSourceCommandPlugin.scala | 4 +-- .../AutoDiscoveryPluginRegistry.scala | 36 +++++++++++++------ .../absa/spline/agent/AgentConfigSpec.scala | 4 ++- .../test/fixture/spline/SplineFixture.scala | 5 +-- .../absa/spline/BasicIntegrationTests.scala | 16 ++++++--- 15 files changed, 111 insertions(+), 29 deletions(-) create mode 100644 core/src/main/scala/za/co/absa/spline/harvester/plugin/PluginsConfiguration.scala diff --git a/README.md b/README.md index 6e2a1453..0f82acab 100644 --- a/README.md +++ b/README.md @@ -698,10 +698,13 @@ When one of these commands occurs spline will let you know by logging a warning. ### Plugin API Using a plugin API you can capture lineage from a 3rd party data source provider. -Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin. +By default, Spline discover plugins automatically by scanning a classpath, so no special steps required to register and configure a plugin. All you need is to create a class extending the `za.co.absa.spline.harvester.plugin.Plugin` marker trait mixed with one or more `*Processing` traits, depending on your intention. +To disable automatic plugin discovery and speed up initialization, set `spline.pluginsEnabledByDefault` to `false` in your configuration file. +Then, you will need to register all necessary plugins one by one, using `spline.plugins` configuration property. + There are three general processing traits: - `DataSourceFormatNameResolving` - returns a name of a data provider/format in use. diff --git a/core/src/main/resources/spline.default.yaml b/core/src/main/resources/spline.default.yaml index 5115eb72..1ee60ac6 100644 --- a/core/src/main/resources/spline.default.yaml +++ b/core/src/main/resources/spline.default.yaml @@ -259,3 +259,6 @@ spline: - collect - collectAsList - toLocalIterator + + # Should plugins be enabled by default. + pluginsEnabledByDefault: true # true | false diff --git a/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala b/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala index bc7da129..2208dd8d 100644 --- a/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala +++ b/core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala @@ -24,6 +24,7 @@ import za.co.absa.spline.harvester.IdGenerator.UUIDVersion import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode} import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher} import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy +import za.co.absa.spline.harvester.plugin.PluginsConfiguration import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter} import scala.collection.JavaConverters._ @@ -37,7 +38,7 @@ private[spline] trait AgentBOM { def lineageDispatcher: LineageDispatcher def iwdStrategy: IgnoredWriteDetectionStrategy def execPlanUUIDVersion: UUIDVersion - def pluginsConfig: Configuration + def pluginsConfig: PluginsConfiguration } object AgentBOM { @@ -62,8 +63,11 @@ object AgentBOM { mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion) } - override def pluginsConfig: Configuration = { - mergedConfig.subset(ConfProperty.PluginsConfigNamespace) + override def pluginsConfig: PluginsConfiguration = { + PluginsConfiguration( + mergedConfig.getRequiredBoolean(ConfProperty.PluginsEnabledByDefault), + mergedConfig.subset(ConfProperty.PluginsConfigNamespace) + ) } override lazy val postProcessingFilter: Option[PostProcessingFilter] = { diff --git a/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala b/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala index 1e5b992e..8e965dde 100644 --- a/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala +++ b/core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala @@ -81,6 +81,16 @@ object AgentConfig { this } + def pluginsEnabledByDefault(enabled: Boolean): this.type = synchronized { + options += ConfProperty.PluginsEnabledByDefault -> enabled + this + } + + def enablePlugin(name: String): this.type = synchronized { + options += s"${ConfProperty.PluginsConfigNamespace}.$name.enabled" -> true + this + } + def build(): AgentConfig = new AgentConfig { options.foreach(tupled(addProperty)) } @@ -123,6 +133,11 @@ object AgentConfig { */ val IgnoreWriteDetectionStrategy = "spline.IWDStrategy" + /** + * Should plugins be enabled by default + */ + val PluginsEnabledByDefault = "spline.pluginsEnabledByDefault" + val PluginsConfigNamespace = "spline.plugins" def dispatcherClassName(logicalName: String): String = s"$RootLineageDispatcher.$logicalName.${HierarchicalObjectFactory.ClassName}" diff --git a/core/src/main/scala/za/co/absa/spline/agent/SplineAgent.scala b/core/src/main/scala/za/co/absa/spline/agent/SplineAgent.scala index d95f227b..3da040f9 100644 --- a/core/src/main/scala/za/co/absa/spline/agent/SplineAgent.scala +++ b/core/src/main/scala/za/co/absa/spline/agent/SplineAgent.scala @@ -30,6 +30,7 @@ import za.co.absa.spline.harvester.builder.write.PluggableWriteCommandExtractor import za.co.absa.spline.harvester.converter.{DataConverter, DataTypeConverter} import za.co.absa.spline.harvester.dispatcher.LineageDispatcher import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy +import za.co.absa.spline.harvester.plugin.PluginsConfiguration import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry import za.co.absa.spline.harvester.postprocessing._ import za.co.absa.spline.harvester.qualifier.HDFSPathQualifier @@ -54,7 +55,7 @@ object SplineAgent extends Logging { ) def create( - pluginsConfig: Configuration, + pluginsConfig: PluginsConfiguration, session: SparkSession, lineageDispatcher: LineageDispatcher, userPostProcessingFilter: Option[PostProcessingFilter], diff --git a/core/src/main/scala/za/co/absa/spline/harvester/builder/dsformat/PluggableDataSourceFormatResolver.scala b/core/src/main/scala/za/co/absa/spline/harvester/builder/dsformat/PluggableDataSourceFormatResolver.scala index 5032a0c9..37bc579b 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/builder/dsformat/PluggableDataSourceFormatResolver.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/builder/dsformat/PluggableDataSourceFormatResolver.scala @@ -25,7 +25,8 @@ class PluggableDataSourceFormatResolver(pluginRegistry: PluginRegistry) extends private val processFn = pluginRegistry.plugins[DataSourceFormatNameResolving] .map(_.formatNameResolver) - .reduce(_ orElse _) + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) .orElse[AnyRef, String] { case dsr: DataSourceRegister => dsr.shortName case o => o.toString diff --git a/core/src/main/scala/za/co/absa/spline/harvester/builder/read/PluggableReadCommandExtractor.scala b/core/src/main/scala/za/co/absa/spline/harvester/builder/read/PluggableReadCommandExtractor.scala index cab16afa..9820abe7 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/builder/read/PluggableReadCommandExtractor.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/builder/read/PluggableReadCommandExtractor.scala @@ -34,12 +34,14 @@ class PluggableReadCommandExtractor( private val planProcessFn = pluginRegistry.plugins[ReadNodeProcessing] .map(_.readNodeProcessor) - .reduce(_ orElse _) + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) private val rddProcessFn = pluginRegistry.plugins[RddReadNodeProcessing] .map(_.rddReadNodeProcessor) - .reduce(_ orElse _) + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) override def asReadCommand(planOrRdd: PlanOrRdd): Option[ReadCommand] = { val res = planOrRdd match { diff --git a/core/src/main/scala/za/co/absa/spline/harvester/builder/write/PluggableWriteCommandExtractor.scala b/core/src/main/scala/za/co/absa/spline/harvester/builder/write/PluggableWriteCommandExtractor.scala index d4197312..de190482 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/builder/write/PluggableWriteCommandExtractor.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/builder/write/PluggableWriteCommandExtractor.scala @@ -38,7 +38,8 @@ class PluggableWriteCommandExtractor( private val processFn: ((FuncName, LogicalPlan)) => Option[WriteNodeInfo] = pluginRegistry.plugins[WriteNodeProcessing] .map(_.writeNodeProcessor) - .reduce(_ orElse _) + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) .lift def asWriteCommand(funcName: FuncName, logicalPlan: LogicalPlan): Option[WriteCommand] = { diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/PluginsConfiguration.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/PluginsConfiguration.scala new file mode 100644 index 00000000..e2a346f6 --- /dev/null +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/PluginsConfiguration.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2023 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.spline.harvester.plugin + +import org.apache.commons.configuration.Configuration + +case class PluginsConfiguration( + pluginsEnabledByDefault: Boolean, + plugins: Configuration +) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/LogicalRelationPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/LogicalRelationPlugin.scala index 01a45370..932b5073 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/LogicalRelationPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/LogicalRelationPlugin.scala @@ -30,7 +30,8 @@ class LogicalRelationPlugin(pluginRegistry: PluginRegistry) extends Plugin with private lazy val baseRelProcessor = pluginRegistry.plugins[BaseRelationProcessing] .map(_.baseRelationProcessor) - .reduce(_ orElse _) + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) override val readNodeProcessor: PartialFunction[LogicalPlan, ReadNodeInfo] = { case lr: LogicalRelation diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/SaveIntoDataSourceCommandPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/SaveIntoDataSourceCommandPlugin.scala index 082fb04b..69cc48fd 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/SaveIntoDataSourceCommandPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/composite/SaveIntoDataSourceCommandPlugin.scala @@ -39,8 +39,8 @@ class SaveIntoDataSourceCommandPlugin( private lazy val rpProcessor = pluginRegistry.plugins[RelationProviderProcessing] .map(_.relationProviderProcessor) - .reduce(_ orElse _) - + .reduceOption(_ orElse _) + .getOrElse(PartialFunction.empty) override def writeNodeProcessor: PartialFunction[(FuncName, LogicalPlan), WriteNodeInfo] = { case (_, cmd: SaveIntoDataSourceCommand) => cmd match { diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/registry/AutoDiscoveryPluginRegistry.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/registry/AutoDiscoveryPluginRegistry.scala index 989c1248..550945de 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/registry/AutoDiscoveryPluginRegistry.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/registry/AutoDiscoveryPluginRegistry.scala @@ -23,7 +23,8 @@ import org.apache.spark.internal.Logging import za.co.absa.spline.commons.lang.ARM import za.co.absa.spline.harvester.plugin.Plugin import za.co.absa.spline.harvester.plugin.Plugin.Precedence -import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledByDefault, EnabledConfProperty, PluginClasses, getOnlyOrThrow} +import za.co.absa.spline.harvester.plugin.PluginsConfiguration +import za.co.absa.spline.harvester.plugin.registry.AutoDiscoveryPluginRegistry.{EnabledConfProperty, PluginClasses, getOnlyOrThrow} import javax.annotation.Priority import scala.collection.JavaConverters._ @@ -32,7 +33,7 @@ import scala.util.Try import scala.util.control.NonFatal class AutoDiscoveryPluginRegistry( - conf: Configuration, + conf: PluginsConfiguration, injectables: AnyRef* ) extends PluginRegistry with Logging { @@ -47,13 +48,29 @@ class AutoDiscoveryPluginRegistry( typedInjectables.groupBy(_._1).mapValues(_.map(_._2)) } - private val allPlugins: Seq[Plugin] = - for (pc <- PluginClasses if isPluginEnabled(pc)) yield { + private val allPlugins: Seq[Plugin] = { + val enabledPlugins = if(conf.pluginsEnabledByDefault) { + PluginClasses.filter(pc => isPluginEnabled(pc.getName)) + } else { + conf.plugins.getKeys.asScala + .filter(_.endsWith(s".$EnabledConfProperty")) // Looking for keys ending with ".enabled", since plugins must be explicitly enabled + .map(_.dropRight(EnabledConfProperty.length + 1)) // Dropping ".enabled" to get plugin class name + .filter(isPluginEnabled) + .map(Class.forName) + .toSeq + } + + if(enabledPlugins.isEmpty) { + throw new RuntimeException("No plugins enabled") + } + + for (pc <- enabledPlugins) yield { logInfo(s"Loading plugin: $pc") instantiatePlugin(pc) .recover({ case NonFatal(e) => throw new RuntimeException(s"Plugin instantiation failure: $pc", e) }) .get } + } override def plugins[A: ClassTag]: Seq[Plugin with A] = { val ct = implicitly[ClassTag[A]] @@ -65,7 +82,7 @@ class AutoDiscoveryPluginRegistry( val constr = getOnlyOrThrow(constrs, s"Plugin class must have a single public constructor: ${constrs.mkString(", ")}") val args = constr.getParameterTypes.map { case ct if classOf[Configuration].isAssignableFrom(ct) => - conf.subset(pluginClass.getName) + conf.plugins.subset(pluginClass.getName) case pt => val candidates = injectablesByType.getOrElse(pt, sys.error(s"Cannot bind $pt. No value found")) getOnlyOrThrow(candidates, s"Ambiguous constructor parameter binding. Multiple values found for $pt: ${candidates.length}") @@ -73,11 +90,11 @@ class AutoDiscoveryPluginRegistry( constr.newInstance(args: _*).asInstanceOf[Plugin] } - private def isPluginEnabled(pc: Class[Plugin]): Boolean = { - val pluginConf = conf.subset(pc.getName) - val isEnabled = pluginConf.getBoolean(EnabledConfProperty, EnabledByDefault) + private def isPluginEnabled(pcn: String): Boolean = { + val pluginConf = conf.plugins.subset(pcn) + val isEnabled = pluginConf.getBoolean(EnabledConfProperty, conf.pluginsEnabledByDefault) if (!isEnabled) { - logWarning(s"Plugin ${pc.getName} is disabled in the configuration.") + logWarning(s"Plugin ${pcn} is disabled in the configuration.") } isEnabled } @@ -87,7 +104,6 @@ class AutoDiscoveryPluginRegistry( object AutoDiscoveryPluginRegistry extends Logging { private val EnabledConfProperty = "enabled" - private val EnabledByDefault = true private val PluginClasses: Seq[Class[Plugin]] = { logDebug("Scanning for plugins") diff --git a/core/src/test/scala/za/co/absa/spline/agent/AgentConfigSpec.scala b/core/src/test/scala/za/co/absa/spline/agent/AgentConfigSpec.scala index aad8ba68..dfea419b 100644 --- a/core/src/test/scala/za/co/absa/spline/agent/AgentConfigSpec.scala +++ b/core/src/test/scala/za/co/absa/spline/agent/AgentConfigSpec.scala @@ -111,13 +111,15 @@ class AgentConfigSpec .lineageDispatcher(mockDispatcher) .postProcessingFilter(mockFilter) .ignoredWriteDetectionStrategy(mockIwdStrategy) + .pluginsEnabledByDefault(false) .build() config should not be empty - config.getKeys.asScala should have length 3 + config.getKeys.asScala should have length 4 config.getProperty(ConfProperty.RootLineageDispatcher) should be theSameInstanceAs mockDispatcher config.getProperty(ConfProperty.RootPostProcessingFilter) should be theSameInstanceAs mockFilter config.getProperty(ConfProperty.IgnoreWriteDetectionStrategy) should be theSameInstanceAs mockIwdStrategy + config.getProperty(ConfProperty.PluginsEnabledByDefault) shouldBe false } } diff --git a/integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala b/integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala index e177662c..c76dc772 100644 --- a/integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala +++ b/integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala @@ -16,11 +16,12 @@ package za.co.absa.spline.test.fixture.spline import org.apache.spark.sql.SparkSession +import za.co.absa.spline.agent.AgentConfig trait SplineFixture { - def withLineageTracking[T](testBody: LineageCaptor => T)(implicit session: SparkSession): T = { - testBody(new LineageCaptor) + def withLineageTracking[T](testBody: LineageCaptor => T, builderCustomizer: AgentConfig.Builder => AgentConfig.Builder = identity)(implicit session: SparkSession): T = { + testBody(new LineageCaptor(builderCustomizer)) } } diff --git a/integration-tests/src/test/scala/za/co/absa/spline/BasicIntegrationTests.scala b/integration-tests/src/test/scala/za/co/absa/spline/BasicIntegrationTests.scala index 27004e9f..dd8a1b94 100644 --- a/integration-tests/src/test/scala/za/co/absa/spline/BasicIntegrationTests.scala +++ b/integration-tests/src/test/scala/za/co/absa/spline/BasicIntegrationTests.scala @@ -35,7 +35,7 @@ class BasicIntegrationTests extends AsyncFlatSpec "saveAsTable" should "process all operations" in withNewSparkSession(implicit spark => - withLineageTracking { captor => + withLineageTracking({ captor => import spark.implicits._ withNewSparkSession { @@ -52,12 +52,16 @@ class BasicIntegrationTests extends AsyncFlatSpec plan.operations.other should have length 2 plan.operations.write should not be null } - } + },{ + // To enable the SQL plugin only + _.pluginsEnabledByDefault(false) + .enablePlugin("za.co.absa.spline.harvester.plugin.embedded.SQLPlugin") + }) ) "save_to_fs" should "process all operations" in withNewSparkSession(implicit spark => - withLineageTracking { captor => + withLineageTracking({ captor => import spark.implicits._ val path = TempDirectory("spline_", ".parquet", pathOnly = true).deleteOnExit().path.toString @@ -71,7 +75,11 @@ class BasicIntegrationTests extends AsyncFlatSpec plan.operations.other should have length 2 plan.operations.write should not be null } - } + },{ + // To enable the SQL plugin only + _.pluginsEnabledByDefault(false) + .enablePlugin("za.co.absa.spline.harvester.plugin.embedded.SQLPlugin") + }) ) "saveAsTable" should "produce URIs compatible with filesystem write" in