Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/distributed_computing_unpacked.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"Distributed Compute Examples\") \\\n",
" .config(\"spark.jars.packages\", \"io.dataflint:spark_2.12:0.8.6\") \\\n",
" .config(\"spark.jars.packages\", \"io.dataflint:spark_2.12:0.8.7\") \\\n",
" .config(\"spark.plugins\", \"io.dataflint.spark.SparkDataflintPlugin\") \\\n",
" .config(\"spark.ui.port\", \"11000\") \\\n",
" .master(\"local[*]\") \\\n",
Expand Down
18 changes: 17 additions & 1 deletion spark-plugin/build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import xerial.sbt.Sonatype._
import sbtassembly.AssemblyPlugin.autoImport._

lazy val versionNum: String = "0.8.6"
lazy val versionNum: String = "0.8.7"
lazy val scala212 = "2.12.20"
lazy val scala213 = "2.13.16"
lazy val supportedScalaVersions = List(scala212, scala213)
Expand Down Expand Up @@ -98,6 +98,22 @@ lazy val pluginspark3 = (project in file("pluginspark3"))
// Include source and resources from plugin directory for tests
Test / unmanagedSourceDirectories += (plugin / Compile / sourceDirectory).value / "scala",

// Fork JVM for tests so javaOptions are applied; required for Spark on Java 9+
Test / fork := true,
// Run test suites sequentially — parallel suites share the SparkSession via getOrCreate()
// and one suite stopping the session causes NPEs in concurrently-running suites
Test / parallelExecution := false,
Test / javaOptions ++= {
// --add-opens is not supported on Java 8 (spec version starts with "1.")
if (sys.props("java.specification.version").startsWith("1.")) Seq.empty
else Seq(
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
)
},
)

lazy val pluginspark4 = (project in file("pluginspark4"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ object DataflintSparkUICommonLoader extends Logging {
val INSTRUMENT_MAP_IN_PANDAS_ENABLED = "spark.dataflint.instrument.spark.mapInPandas.enabled"
val INSTRUMENT_MAP_IN_ARROW_ENABLED = "spark.dataflint.instrument.spark.mapInArrow.enabled"
val INSTRUMENT_WINDOW_ENABLED = "spark.dataflint.instrument.spark.window.enabled"
val INSTRUMENT_ARROW_EVAL_PYTHON_ENABLED = "spark.dataflint.instrument.spark.arrowEvalPython.enabled"
val INSTRUMENT_BATCH_EVAL_PYTHON_ENABLED = "spark.dataflint.instrument.spark.batchEvalPython.enabled"
val INSTRUMENT_FLAT_MAP_GROUPS_PANDAS_ENABLED = "spark.dataflint.instrument.spark.flatMapGroupsInPandas.enabled"
val INSTRUMENT_FLAT_MAP_COGROUPS_PANDAS_ENABLED = "spark.dataflint.instrument.spark.flatMapCoGroupsInPandas.enabled"

def install(context: SparkContext, pageFactory: DataflintPageFactory): String = {
new DataflintSparkUICommonInstaller().install(context, pageFactory)
Expand Down Expand Up @@ -166,7 +170,13 @@ object DataflintSparkUICommonLoader extends Logging {
val mapInPandasEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_PANDAS_ENABLED, defaultValue = false)
val mapInArrowEnabled = sc.conf.getBoolean(INSTRUMENT_MAP_IN_ARROW_ENABLED, defaultValue = false)
val windowEnabled = sc.conf.getBoolean(INSTRUMENT_WINDOW_ENABLED, defaultValue = false)
val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled || windowEnabled
val arrowEvalPythonEnabled = sc.conf.getBoolean(INSTRUMENT_ARROW_EVAL_PYTHON_ENABLED, defaultValue = false)
val batchEvalPythonEnabled = sc.conf.getBoolean(INSTRUMENT_BATCH_EVAL_PYTHON_ENABLED, defaultValue = false)
val flatMapGroupsPandasEnabled = sc.conf.getBoolean(INSTRUMENT_FLAT_MAP_GROUPS_PANDAS_ENABLED, defaultValue = false)
val flatMapCogroupsPandasEnabled = sc.conf.getBoolean(INSTRUMENT_FLAT_MAP_COGROUPS_PANDAS_ENABLED, defaultValue = false)
val anyInstrumentationEnabled = instrumentEnabled || mapInPandasEnabled || mapInArrowEnabled ||
windowEnabled || arrowEvalPythonEnabled || batchEvalPythonEnabled ||
flatMapGroupsPandasEnabled || flatMapCogroupsPandasEnabled
if (!anyInstrumentationEnabled) {
logInfo("DataFlint instrumentation extension is disabled (no instrumentation flags enabled)")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,17 @@ package org.apache.spark.dataflint

import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, WindowFunctionType}
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions, Strategy}
import org.apache.spark.sql.catalyst.expressions.WindowFunctionType
import org.apache.spark.sql.catalyst.planning.PhysicalWindow
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Window => LogicalWindow}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
import org.apache.spark.sql.execution.python.{DataFlintMapInPandasExec_3_0, DataFlintMapInPandasExec_3_1, DataFlintMapInPandasExec_3_3, DataFlintMapInPandasExec_3_4, DataFlintMapInPandasExec_3_5, DataFlintPythonMapInArrowExec_3_3, DataFlintPythonMapInArrowExec_3_4, DataFlintPythonMapInArrowExec_3_5, DataFlintWindowInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, BatchEvalPythonExec, DataFlintArrowEvalPythonExec, DataFlintBatchEvalPythonExec, DataFlintFlatMapCoGroupsInPandasExec, DataFlintFlatMapGroupsInPandasExec, DataFlintMapInArrowExec_3_3, DataFlintMapInPandasExec_3_0, DataFlintMapInPandasExec_3_5, DataFlintPythonMapInArrowExec_3_5, DataFlintWindowInPandasExec, FlatMapCoGroupsInPandasExec, FlatMapGroupsInPandasExec, MapInPandasExec, WindowInPandasExec}
import org.apache.spark.sql.execution.window.DataFlintWindowExec

/**
* A SparkSessionExtension that injects DataFlint instrumentation into Spark's physical planning phase.
* This extension replaces:
* - MapInPandasExec with the version-appropriate DataFlintMapInPandasExec (adds duration metric)
* - PythonMapInArrowExec with the version-appropriate DataFlintPythonMapInArrowExec (adds duration metric)
*
* Supports Spark 3.0.x through 3.5.x with version-specific implementations that match
* each version's internal API.
*
* Note: mapInArrow instrumentation is only available on Spark 3.3+ (PythonMapInArrowExec was
* introduced in SPARK-37227). On Spark 3.0–3.2, only mapInPandas instrumentation is supported.
*
* The extension is automatically registered by SparkDataflintPlugin when any instrumentation flag is enabled:
* - spark.dataflint.instrument.spark.enabled (global)
* - spark.dataflint.instrument.spark.mapInPandas.enabled
* - spark.dataflint.instrument.spark.mapInArrow.enabled
*
* Can also be manually registered via:
* .config("spark.sql.extensions", "org.apache.spark.dataflint.DataFlintInstrumentationExtension")
* A SparkSessionExtensions that injects DataFlint instrumentation into Spark's physical planning phase.
*/
class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit) with Logging {

Expand Down Expand Up @@ -61,6 +44,9 @@ class DataFlintInstrumentationExtension extends (SparkSessionExtensions => Unit)
* The mapInPandas and mapInArrow replacements are performed in separate transforms so that
* PythonMapInArrowExec (which doesn't exist before Spark 3.3) doesn't cause NoClassDefFoundError
* on older Spark versions.
*
* Each replace method guards against double-wrapping with isInstanceOf checks so that
* re-runs (e.g. under AQE prepareForExecution) are idempotent.
*/
case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends ColumnarRule with Logging {

Expand All @@ -83,9 +69,40 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
globalEnabled || specificEnabled
}

private val arrowEvalPythonEnabled: Boolean = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_ARROW_EVAL_PYTHON_ENABLED, defaultValue = false)
globalEnabled || specificEnabled
}

private val batchEvalPythonEnabled: Boolean = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_BATCH_EVAL_PYTHON_ENABLED, defaultValue = false)
globalEnabled || specificEnabled
}

private val flatMapGroupsEnabled: Boolean = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_FLAT_MAP_GROUPS_PANDAS_ENABLED, defaultValue = false)
globalEnabled || specificEnabled
}

private val flatMapCoGroupsEnabled: Boolean = {
val conf = session.sparkContext.conf
val globalEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_SPARK_ENABLED, defaultValue = false)
val specificEnabled = conf.getBoolean(DataflintSparkUICommonLoader.INSTRUMENT_FLAT_MAP_COGROUPS_PANDAS_ENABLED, defaultValue = false)
globalEnabled || specificEnabled
}

override def preColumnarTransitions: Rule[SparkPlan] = { plan =>
if (!mapInPandasEnabled && !mapInArrowEnabled) plan
if (!mapInPandasEnabled && !mapInArrowEnabled && !arrowEvalPythonEnabled && !batchEvalPythonEnabled && !flatMapGroupsEnabled && !flatMapCoGroupsEnabled) plan
else {
val inputNodes = plan.collect { case n => n.getClass.getSimpleName }.mkString(", ")
logWarning(s"DataFlint preColumnarTransitions ENTER — plan nodes: [$inputNodes]")

var result = plan

if (mapInPandasEnabled) {
Expand All @@ -96,51 +113,122 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
result = replaceMapInArrow(result)
}

if (arrowEvalPythonEnabled) {
result = replaceArrowEvalPython(result)
}

if (batchEvalPythonEnabled) {
result = replaceBatchEvalPython(result)
}

if (flatMapGroupsEnabled) {
result = replaceFlatMapGroupsInPandas(result)
}

if (flatMapCoGroupsEnabled) {
result = replaceFlatMapCoGroupsInPandas(result)
}

val outputNodes = result.collect { case n => n.getClass.getSimpleName }.mkString(", ")
logWarning(s"DataFlint preColumnarTransitions EXIT — plan nodes: [$outputNodes]")
result
}
}

private def replaceMapInPandas(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case mapInPandas: MapInPandasExec =>
logInfo(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion")
logWarning(s"Replacing MapInPandasExec with DataFlint version for Spark $sparkMinorVersion")
sparkMinorVersion match {
case "3.0" =>
DataFlintMapInPandasExec_3_0(
func = mapInPandas.func,
output = mapInPandas.output,
child = mapInPandas.child
)
case "3.1" | "3.2" =>
DataFlintMapInPandasExec_3_1(
func = mapInPandas.func,
output = mapInPandas.output,
child = mapInPandas.child
)
case "3.3" =>
DataFlintMapInPandasExec_3_3(
func = mapInPandas.func,
output = mapInPandas.output,
child = mapInPandas.child
)
case "3.4" =>
DataFlintMapInPandasExec_3_4(
case "3.5" =>
DataFlintMapInPandasExec_3_5(
func = mapInPandas.func,
output = mapInPandas.output,
child = mapInPandas.child
child = mapInPandas.child,
isBarrier = mapInPandas.isBarrier
)
case _ =>
// Default to 3.5 implementation for 3.5.x and any future 3.x
DataFlintMapInPandasExec_3_5(
// 3.0–3.4: all share the same 3-arg constructor, handled via reflection
DataFlintMapInPandasExec_3_0(
func = mapInPandas.func,
output = mapInPandas.output,
child = mapInPandas.child,
isBarrier = mapInPandas.isBarrier
child = mapInPandas.child
)
}
}
}

/**
* Replaces ArrowEvalPythonExec nodes (pandas_udf SCALAR) with DataFlint instrumented versions.
*
* Only supported on Spark 3.2+ which uses the 4-param constructor (udfs, resultAttrs, child,
* evalType). Spark 3.0–3.1 used a 3-param constructor incompatible with our wrapper.
*
* The isInstanceOf guard makes the rule idempotent — safe to re-run under AQE.
*/
private def replaceArrowEvalPython(plan: SparkPlan): SparkPlan = {
if (sparkMinorVersion == "3.0" || sparkMinorVersion == "3.1") {
logWarning("ArrowEvalPython instrumentation requires Spark 3.2+ — skipping on Spark " + sparkMinorVersion)
return plan
}
try {
plan.transformUp {
case arrowEval: ArrowEvalPythonExec if !arrowEval.isInstanceOf[DataFlintArrowEvalPythonExec] =>
logWarning(s"Replacing ArrowEvalPythonExec with DataFlint version for Spark $sparkMinorVersion")
DataFlintArrowEvalPythonExec(
udfs = arrowEval.udfs,
resultAttrs = arrowEval.resultAttrs,
child = arrowEval.child,
evalType = arrowEval.evalType
)
}
} catch {
case _: NoClassDefFoundError =>
logWarning("ArrowEvalPythonExec not available in this Spark version — arrowEvalPython instrumentation disabled")
plan
}
}

private def replaceBatchEvalPython(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case exec: BatchEvalPythonExec if !exec.isInstanceOf[DataFlintBatchEvalPythonExec] =>
logWarning(s"Replacing BatchEvalPythonExec with DataFlint version for Spark $sparkMinorVersion")
DataFlintBatchEvalPythonExec(
udfs = exec.udfs,
resultAttrs = exec.resultAttrs,
child = exec.child
)
}
}

private def replaceFlatMapGroupsInPandas(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case exec: FlatMapGroupsInPandasExec if !exec.isInstanceOf[DataFlintFlatMapGroupsInPandasExec] =>
logWarning(s"Replacing FlatMapGroupsInPandasExec with DataFlint version for Spark $sparkMinorVersion")
DataFlintFlatMapGroupsInPandasExec(
groupingAttributes = exec.groupingAttributes,
func = exec.func,
output = exec.output,
child = exec.child
)
}
}

private def replaceFlatMapCoGroupsInPandas(plan: SparkPlan): SparkPlan = {
plan.transformUp {
case exec: FlatMapCoGroupsInPandasExec if !exec.isInstanceOf[DataFlintFlatMapCoGroupsInPandasExec] =>
logWarning(s"Replacing FlatMapCoGroupsInPandasExec with DataFlint version for Spark $sparkMinorVersion")
DataFlintFlatMapCoGroupsInPandasExec(
leftGroup = exec.leftGroup,
rightGroup = exec.rightGroup,
func = exec.func,
output = exec.output,
left = exec.left,
right = exec.right
)
}
}

/**
* Replaces PythonMapInArrowExec nodes with DataFlint instrumented versions.
*
Expand All @@ -155,27 +243,21 @@ case class DataFlintInstrumentationColumnarRule(session: SparkSession) extends C
try {
plan.transformUp {
case mapInArrow: PythonMapInArrowExec =>
logInfo(s"Replacing PythonMapInArrowExec with DataFlint version for Spark $sparkMinorVersion")
logWarning(s"Replacing PythonMapInArrowExec with DataFlint version for Spark $sparkMinorVersion")
sparkMinorVersion match {
case "3.3" =>
DataFlintPythonMapInArrowExec_3_3(
func = mapInArrow.func,
output = mapInArrow.output,
child = mapInArrow.child
)
case "3.4" =>
DataFlintPythonMapInArrowExec_3_4(
case "3.5" =>
DataFlintPythonMapInArrowExec_3_5(
func = mapInArrow.func,
output = mapInArrow.output,
child = mapInArrow.child
child = mapInArrow.child,
isBarrier = mapInArrow.isBarrier
)
case _ =>
// Default to 3.5 implementation for 3.5.x and any future 3.x
DataFlintPythonMapInArrowExec_3_5(
// 3.3–3.4: share the same 3-arg constructor, handled via reflection
DataFlintMapInArrowExec_3_3(
func = mapInArrow.func,
output = mapInArrow.output,
child = mapInArrow.child,
isBarrier = mapInArrow.isBarrier
child = mapInArrow.child
)
}
}
Expand Down Expand Up @@ -208,16 +290,16 @@ case class DataFlintWindowPlannerStrategy(session: SparkSession) extends Strateg
plan match {
case PhysicalWindow(
WindowFunctionType.SQL, windowExprs, partitionSpec, orderSpec, child) =>
logInfo("Replacing logical Window with DataFlintWindowExec")
logWarning("Replacing logical Window with DataFlintWindowExec")
DataFlintWindowExec(
windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil

case PhysicalWindow(
WindowFunctionType.Python, windowExprs, partitionSpec, orderSpec, child) =>
logInfo("Replacing logical Window (Python UDF) with DataFlintWindowInPandasExec")
logWarning("Replacing logical Window (Python UDF) with DataFlintWindowInPandasExec")
DataFlintWindowInPandasExec(
windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
case _ => Nil
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.dataflint

import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.metric.SQLMetric

object MetricsUtils {
def getTimingMetric(name: String)(implicit sparkContext:SparkContext): (String, SQLMetric) = {
name -> {
val metric = new SQLMetric("timing", -1L)
metric.register(sparkContext, Some(name), countFailedValues = false)
metric
}
}
}
Loading
Loading