From d440eb77ce643c11aa46c4e23ba1a7bff78935f3 Mon Sep 17 00:00:00 2001 From: Adam Cervenka Date: Wed, 30 Aug 2023 11:39:22 +0200 Subject: [PATCH] spark agent #705 Spark 3.4 support --- .../harvester/plugin/embedded/RDDPlugin.scala | 13 ++++++--- integration-tests/pom.xml | 17 +++++++++++- .../scala/za/co/absa/spline/DeltaSpec.scala | 4 +-- pom.xml | 27 +++++++++++++++++++ 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala index e6c876fb..dddfbd0b 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala @@ -19,7 +19,7 @@ package za.co.absa.spline.harvester.plugin.embedded import org.apache.spark.Partition import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.datasources.FileScanRDD +import org.apache.spark.sql.execution.datasources.{FileScanRDD, PartitionedFile} import za.co.absa.spline.commons.reflect.ReflectionUtils import za.co.absa.spline.harvester.builder._ import za.co.absa.spline.harvester.plugin.Plugin.{Precedence, ReadNodeInfo} @@ -39,7 +39,8 @@ class RDDPlugin( override def rddReadNodeProcessor: PartialFunction[RDD[_], ReadNodeInfo] = { case fsr: FileScanRDD => - val uris = fsr.filePartitions.flatMap(_.files.map(_.filePath)) + val files = fsr.filePartitions.flatMap(_.files) + val uris = files.map(extractPath(_)) ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty) case hr: HadoopRDD[_, _] => val partitions = ReflectionUtils.extractValue[Array[Partition]](hr, "partitions_") @@ -47,6 +48,13 @@ class RDDPlugin( ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty) } + private def extractPath(file: PartitionedFile): String = { + val path = ReflectionUtils.extractValue[AnyRef](file, "filePath") + // for Spark 3.3 and lower path is a String + // for Spark 3.4 path is org.apache.spark.paths.SparkPath + path.toString + } + private def hadoopPartitionToUriString(hadoopPartition: Partition): String = { val inputSplit = ReflectionUtils.extractValue[AnyRef](hadoopPartition, "inputSplit") val fileSplitT = ReflectionUtils.extractValue[AnyRef](inputSplit, "t") @@ -56,5 +64,4 @@ class RDDPlugin( uri.toString } - } diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3dcaadfc..8739193f 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -175,7 +175,7 @@ org.elasticsearch elasticsearch-spark-${elasticsearch.spark.sufix}_${scala.binary.version} - 8.2.2 + 8.9.1 test @@ -267,6 +267,21 @@ + + spark-3.4 + + 16.0.1 + 30 + + + + org.apache.iceberg + iceberg-spark-runtime-3.4_${scala.binary.version} + 1.3.1 + test + + + diff --git a/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala b/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala index 4e1c2b72..d8780c03 100644 --- a/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala +++ b/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala @@ -37,7 +37,7 @@ class DeltaSpec extends AsyncFlatSpec private val deltaPath = TempDirectory(prefix = "delta", pathOnly = true).deleteOnExit().toURI.toString it should "support Delta Lake as a source" taggedAs - ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in + ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in withNewSparkSession { implicit spark => withLineageTracking { captor => val testData: DataFrame = { @@ -79,7 +79,7 @@ class DeltaSpec extends AsyncFlatSpec } it should "support insert into existing Delta Lake table" taggedAs - ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in + ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in withNewSparkSession { implicit spark => withLineageTracking { lineageCaptor => val testData: DataFrame = { diff --git a/pom.xml b/pom.xml index 2d1c7079..ec8baaad 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ 3.1.3 3.2.3 3.3.1 + 3.4.1 @@ -100,6 +101,8 @@ 1.0.0 2.0.0 2.1.0 + 2.4.0 + ${cassandra-connector-24.version} @@ -108,6 +111,7 @@ 3.1.0 3.2.0 3.3.0 + 3.4.1 0.13.7 @@ -815,6 +819,29 @@ + + spark-3.4 + + ${spark-34.version} + ${delta-24.version} + ${spark-34.version}_0.19.0 + ${cassandra-connector-34.version} + + + + + org.apache.spark + spark-parent_${scala.binary.version} + ${spark.version} + pom + import + + + + + + +