Skip to content

Commit

Permalink
spark agent #705 Spark 3.4 support
Browse files Browse the repository at this point in the history
  • Loading branch information
cerveada committed Aug 30, 2023
1 parent 0ed969a commit d440eb7
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.spline.harvester.plugin.embedded
import org.apache.spark.Partition
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.FileScanRDD
import org.apache.spark.sql.execution.datasources.{FileScanRDD, PartitionedFile}
import za.co.absa.spline.commons.reflect.ReflectionUtils
import za.co.absa.spline.harvester.builder._
import za.co.absa.spline.harvester.plugin.Plugin.{Precedence, ReadNodeInfo}
Expand All @@ -39,14 +39,22 @@ class RDDPlugin(

override def rddReadNodeProcessor: PartialFunction[RDD[_], ReadNodeInfo] = {
case fsr: FileScanRDD =>
val uris = fsr.filePartitions.flatMap(_.files.map(_.filePath))
val files = fsr.filePartitions.flatMap(_.files)
val uris = files.map(extractPath(_))
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
case hr: HadoopRDD[_, _] =>
val partitions = ReflectionUtils.extractValue[Array[Partition]](hr, "partitions_")
val uris = partitions.map(hadoopPartitionToUriString)
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
}

private def extractPath(file: PartitionedFile): String = {
val path = ReflectionUtils.extractValue[AnyRef](file, "filePath")
// for Spark 3.3 and lower path is a String
// for Spark 3.4 path is org.apache.spark.paths.SparkPath
path.toString
}

private def hadoopPartitionToUriString(hadoopPartition: Partition): String = {
val inputSplit = ReflectionUtils.extractValue[AnyRef](hadoopPartition, "inputSplit")
val fileSplitT = ReflectionUtils.extractValue[AnyRef](inputSplit, "t")
Expand All @@ -56,5 +64,4 @@ class RDDPlugin(

uri.toString
}

}
17 changes: 16 additions & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-${elasticsearch.spark.sufix}_${scala.binary.version}</artifactId>
<version>8.2.2</version>
<version>8.9.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down Expand Up @@ -267,6 +267,21 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-3.4</id>
<properties>
<guava.version>16.0.1</guava.version>
<elasticsearch.spark.sufix>30</elasticsearch.spark.sufix>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.4_${scala.binary.version}</artifactId>
<version>1.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DeltaSpec extends AsyncFlatSpec
private val deltaPath = TempDirectory(prefix = "delta", pathOnly = true).deleteOnExit().toURI.toString

it should "support Delta Lake as a source" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
Expand Down Expand Up @@ -79,7 +79,7 @@ class DeltaSpec extends AsyncFlatSpec
}

it should "support insert into existing Delta Lake table" taggedAs
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { lineageCaptor =>
val testData: DataFrame = {
Expand Down
27 changes: 27 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
<spark-31.version>3.1.3</spark-31.version>
<spark-32.version>3.2.3</spark-32.version>
<spark-33.version>3.3.1</spark-33.version>
<spark-34.version>3.4.1</spark-34.version>

<!-- Delta -->

Expand All @@ -100,6 +101,8 @@
<delta-10.version>1.0.0</delta-10.version>
<delta-20.version>2.0.0</delta-20.version>
<delta-21.version>2.1.0</delta-21.version>
<delta-24.version>2.4.0</delta-24.version>


<!-- Cassandra -->
<cassandra-connector.version>${cassandra-connector-24.version}</cassandra-connector.version>
Expand All @@ -108,6 +111,7 @@
<cassandra-connector-31.version>3.1.0</cassandra-connector-31.version>
<cassandra-connector-32.version>3.2.0</cassandra-connector-32.version>
<cassandra-connector-33.version>3.3.0</cassandra-connector-33.version>
<cassandra-connector-34.version>3.4.1</cassandra-connector-34.version>

<spark-excel.version>0.13.7</spark-excel.version>

Expand Down Expand Up @@ -815,6 +819,29 @@
</dependencyManagement>
</profile>

<profile>
<id>spark-3.4</id>
<properties>
<spark.version>${spark-34.version}</spark.version>
<delta.version>${delta-24.version}</delta.version>
<spark-excel.version>${spark-34.version}_0.19.0</spark-excel.version>
<cassandra-connector.version>${cassandra-connector-34.version}</cassandra-connector.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</profile>



<!-- Binary compatibility checking profile -->

<profile>
Expand Down

0 comments on commit d440eb7

Please sign in to comment.