From 38d309ae88b01816f6cd7ac4517312929ba076f7 Mon Sep 17 00:00:00 2001 From: zhangyizhong03 Date: Thu, 4 Jun 2026 23:03:34 +0800 Subject: [PATCH] perf: use NativePartition wrapper and @transient rddPartitions to reduce serialization overhead Introduce NativePartition[P] wrapper to carry original partition payloads through NativeRDD without re-indexing into the partitions array at execution time, which avoids serializing the full partitions array. Key changes: - Add @transient to rddPartitions in NativeRDD - Add NativePartition case class for type-safe partition payload access - Wrap input partitions in NativePartition before NativeRDD creation - Extract original partition via .asInstanceOf[NativePartition[P]].payload --- .../apache/spark/sql/auron/NativeRDD.scala | 13 ++++++---- .../auron/plan/ConvertToNativeBase.scala | 2 +- .../execution/auron/plan/NativeAggBase.scala | 10 ++++++-- .../plan/NativeBroadcastExchangeBase.scala | 6 +++-- .../auron/plan/NativeBroadcastJoinBase.scala | 17 ++++++------- .../auron/plan/NativeCollectLimitBase.scala | 11 +++++---- .../auron/plan/NativeExpandBase.scala | 9 +++++-- .../auron/plan/NativeFileSourceScanBase.scala | 13 +++------- .../auron/plan/NativeFilterBase.scala | 9 +++++-- .../auron/plan/NativeGenerateBase.scala | 9 +++++-- .../auron/plan/NativeGlobalLimitBase.scala | 17 ++++++------- .../auron/plan/NativeLocalLimitBase.scala | 13 +++++----- .../auron/plan/NativeOrcScanBase.scala | 3 +-- ...NativeParquetInsertIntoHiveTableBase.scala | 4 ++-- .../auron/plan/NativeParquetScanBase.scala | 1 - .../auron/plan/NativeParquetSinkBase.scala | 13 +++++----- .../auron/plan/NativeProjectBase.scala | 9 +++++-- .../auron/plan/NativeRenameColumnsBase.scala | 14 ++++++----- .../plan/NativeShuffleExchangeBase.scala | 24 ++++++++++--------- .../plan/NativeShuffledHashJoinBase.scala | 14 ++++++++--- .../execution/auron/plan/NativeSortBase.scala | 9 +++++-- .../auron/plan/NativeSortMergeJoinBase.scala | 14 ++++++++--- .../auron/plan/NativeTakeOrderedBase.scala | 13 ++++++---- .../auron/plan/NativeUnionBase.scala | 5 ++-- .../auron/plan/NativeWindowBase.scala | 9 +++++-- .../auron/plan/NativeHiveTableScanBase.scala | 2 +- 26 files changed, 163 insertions(+), 100 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala index f2629241a..905b396fe 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala @@ -36,7 +36,7 @@ import org.apache.auron.util.SparkVersionUtil class NativeRDD( @transient private val rddSparkContext: SparkContext, val metrics: SparkMetricNode, - private val rddPartitions: Array[Partition], + @transient private val rddPartitions: Array[Partition], private val rddPartitioner: Option[Partitioner], private val rddDependencies: Seq[Dependency[_]], private val rddShuffleReadFull: Boolean, @@ -46,13 +46,13 @@ class NativeRDD( with Logging with Serializable { - // use serializable wrapper to avoid serializing nativePlan - val nativePlanWrapper = new NativePlanWrapper(nativePlan) - if (friendlyName != null) { setName(friendlyName) } + // use serializable wrapper to avoid serializing nativePlan + val nativePlanWrapper = new NativePlanWrapper(nativePlan) + def nativePlan(p: Partition, tc: TaskContext): PhysicalPlanNode = { nativePlanWrapper.plan(p, tc) } @@ -60,9 +60,10 @@ class NativeRDD( def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this) Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull) + override val partitioner: Option[Partitioner] = rddPartitioner + override protected def getPartitions: Array[Partition] = rddPartitions override protected def getDependencies: Seq[Dependency[_]] = rddDependencies - override val partitioner: Option[Partitioner] = rddPartitioner override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val computingNativePlan = nativePlanWrapper.plan(split, context) @@ -101,6 +102,8 @@ class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext) } +case class NativePartition[P](override val index: Int, payload: P) extends Partition {} + class NativePlanWrapper(var p: (Partition, TaskContext) => PhysicalPlanNode) extends Serializable { def plan(split: Partition, context: TaskContext): PhysicalPlanNode = { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala index 9ccad26a5..a4e00ae81 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeBase.scala @@ -43,7 +43,7 @@ import org.apache.auron.protobuf.FFIReaderExecNode import org.apache.auron.protobuf.PhysicalPlanNode import org.apache.auron.protobuf.Schema -abstract class ConvertToNativeBase(override val child: SparkPlan) +abstract class ConvertToNativeBase(@transient override val child: SparkPlan) extends UnaryExecNode with NativeSupports { override val nodeName: String = "ConvertToNative" diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala index 6613791fc..0be19d50a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala @@ -21,10 +21,12 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.internal.Logging import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.auron.Shims import org.apache.spark.sql.catalyst.expressions.Ascending @@ -173,18 +175,22 @@ abstract class NativeAggBase( val nativeAggrModes = this.nativeAggrModes val nativeAggrs = this.nativeAggrs val nativeGroupingExprs = this.nativeGroupingExprs + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload lazy val inputPlan = - inputRDD.nativePlan(inputRDD.partitions(partition.index), taskContext) + inputRDD.nativePlan(inputPartition, taskContext) pb.PhysicalPlanNode .newBuilder() .setAgg( diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala index 24df117e8..2f1107bd1 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala @@ -67,7 +67,9 @@ import org.apache.auron.{protobuf => pb, sparkver} import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode -abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val child: SparkPlan) +abstract class NativeBroadcastExchangeBase( + @transient mode: BroadcastMode, + override val child: SparkPlan) extends BroadcastExchangeLike with NativeSupports { @@ -219,7 +221,7 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi metrics("dataSize") += byteArray.length }) - val input = inputRDD.nativePlan(inputRDD.partitions(split.index), context) + val input = inputRDD.nativePlan(split, context) val nativeIpcWriterExec = pb.PhysicalPlanNode .newBuilder() .setIpcWriter( diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala index f6f929d34..bfa695241 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastJoinBase.scala @@ -21,11 +21,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency import org.apache.spark.Partition -import org.apache.spark.sql.auron.NativeConverters -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports -import org.apache.spark.sql.auron.Shims +import org.apache.spark.sql.auron.{NativeConverters, NativeHelper, NativePartition, NativeRDD, NativeSupports, Shims} import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.FullOuter @@ -145,10 +141,14 @@ abstract class NativeBroadcastJoinBase( Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType) }) + val nativePartitions = probedRDD.partitions.map { p => + NativePartition[Partition](p.index, p) + } + new NativeRDD( sparkContext, nativeMetrics, - probedRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = probedRDD.partitioner, rddDependencies = new OneToOneDependency(probedRDD) :: Nil, probedShuffleReadFull, @@ -156,14 +156,15 @@ abstract class NativeBroadcastJoinBase( val partition0 = new Partition() { override def index: Int = 0 } + val probedPartition = partition.asInstanceOf[NativePartition[Partition]].payload val (leftChild, rightChild) = broadcastSide match { case JoinBuildLeft => ( leftRDD.nativePlan(partition0, context), - rightRDD.nativePlan(rightRDD.partitions(partition.index), context)) + rightRDD.nativePlan(probedPartition, context)) case JoinBuildRight => ( - leftRDD.nativePlan(leftRDD.partitions(partition.index), context), + leftRDD.nativePlan(probedPartition, context), rightRDD.nativePlan(partition0, context)) } val cachedBuildHashMapId = s"bhm_stage${context.stageId}_rdd${builtRDD.id}" diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala index d33661030..869697179 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.auron.plan import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.OneToOneDependency -import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports, Shims} +import org.apache.spark.{OneToOneDependency, Partition} +import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports, Shims} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, UnknownPartitioning} @@ -65,16 +65,19 @@ abstract class NativeCollectLimitBase(limit: Int, offset: Int, override val chil // merge all LocalLimit child partitions into a single partition val shuffled = Shims.get.createNativeShuffleExchangeExec(SinglePartition, partial) val singlePartitionRDD = NativeHelper.executeNative(shuffled) + val nativePartitions = singlePartitionRDD.partitions.map { p => + NativePartition[Partition](p.index, p) + } new NativeRDD( sparkContext, SparkMetricNode(metrics, singlePartitionRDD.metrics :: Nil), - singlePartitionRDD.partitions, + rddPartitions = nativePartitions.toArray, singlePartitionRDD.partitioner, new OneToOneDependency(singlePartitionRDD) :: Nil, rddShuffleReadFull = false, (partition, taskContext) => { - val inputPartition = singlePartitionRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeLimitExec = LimitExecNode .newBuilder() .setInput(singlePartitionRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala index ada3240fe..36ee1c0c5 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandBase.scala @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -75,16 +77,19 @@ abstract class NativeExpandBase( val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) val nativeSchema = this.nativeSchema val nativeProjections = this.nativeProjections + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeExpandExec = ExpandExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala index 84e886060..8ac289a2a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFileSourceScanBase.scala @@ -55,6 +55,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) override val output: Seq[Attribute] = basedFileScan.output override val outputPartitioning: Partitioning = basedFileScan.outputPartitioning + @transient protected val inputFileScanRDD: FileScanRDD = { MethodUtils.invokeMethod(basedFileScan, true, "prepare") MethodUtils.invokeMethod(basedFileScan, true, "waitForSubqueries") @@ -77,13 +78,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) private val partitionSchema = basedFileScan.relation.partitionSchema - private val fileSizes = inputFileScanRDD.filePartitions - .flatMap(_.files) - .groupBy(_.filePath) - .mapValues(_.foldLeft(0L)(_ + _.length)) - .map(identity) // make this map serializable - .toMap - // predicate pruning is buggy for decimal type, so we need to // temporarily disable predicate pruning for decimal type // see https://github.com/apache/auron/issues/1032 @@ -104,7 +98,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) protected def nativePartitionSchema: pb.Schema = NativeConverters.convertSchema(partitionSchema) - protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => { + protected def nativeFileGroups(partition: FilePartition): pb.FileGroup = { // list input file statuses val nativePartitionedFile = (file: PartitionedFile) => { val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => @@ -115,7 +109,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) pb.PartitionedFile .newBuilder() .setPath(s"${file.filePath}") - .setSize(fileSizes(file.filePath)) + .setSize(file.fileSize) .addAllPartitionValues(nativePartitionValues.asJava) .setLastModifiedNs(0) .setRange( @@ -136,7 +130,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec) nativePruningPredicateFilters nativeFileSchema nativePartitionSchema - nativeFileGroups protected def putJniBridgeResource( resourceId: String, diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala index 0c29a6ddd..ccf7ca83d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala @@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.And import org.apache.spark.sql.catalyst.expressions.Attribute @@ -91,15 +93,18 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) val nativeFilterExprs = this.nativeFilterExprs + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeFilterExec = FilterExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala index cea3f1294..ba21e5e85 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateBase.scala @@ -22,9 +22,11 @@ import scala.jdk.CollectionConverters._ import com.google.protobuf.ByteString import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -134,16 +136,19 @@ abstract class NativeGenerateBase( val nativeGenerator = this.nativeGenerator val nativeGeneratorOutput = this.nativeGeneratorOutput val nativeRequiredChildOutput = this.nativeRequiredChildOutput + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeGenerateExec = pb.GenerateExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala index e8c54d47e..314b8adcd 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.auron.plan import scala.collection.immutable.SortedMap -import org.apache.spark.OneToOneDependency -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports +import org.apache.spark.{OneToOneDependency, Partition} +import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.AllTuples @@ -53,16 +51,19 @@ abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val child override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - inputRDD.partitions, - inputRDD.partitioner, - new OneToOneDependency(inputRDD) :: Nil, + rddPartitions = nativePartitions.toArray, + rddPartitioner = inputRDD.partitioner, + rddDependencies = new OneToOneDependency(inputRDD) :: Nil, rddShuffleReadFull = false, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeLimitExec = LimitExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala index 101867283..2e589e04d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.auron.plan import scala.collection.immutable.SortedMap -import org.apache.spark.OneToOneDependency -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports +import org.apache.spark.{OneToOneDependency, Partition} +import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -50,16 +48,19 @@ abstract class NativeLocalLimitBase(limit: Int, override val child: SparkPlan) override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, rddShuffleReadFull = false, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeLimitExec = LimitExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala index aad943888..38d7b983a 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeOrcScanBase.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.auron.{protobuf => pb} import org.apache.auron.metric.SparkMetricNode -abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec) +abstract class NativeOrcScanBase(@transient basedFileScan: FileSourceScanExec) extends NativeFileSourceScanBase(basedFileScan) { override def doExecuteNative(): NativeRDD = { @@ -49,7 +49,6 @@ abstract class NativeOrcScanBase(basedFileScan: FileSourceScanExec) })) val nativePruningPredicateFilters = this.nativePruningPredicateFilters val nativeFileSchema = this.nativeFileSchema - val nativeFileGroups = this.nativeFileGroups val nativePartitionSchema = this.nativePartitionSchema val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name)) val broadcastedHadoopConf = this.broadcastedHadoopConf diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index f40e79a4a..09ab9e887 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -56,8 +56,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive.execution.InsertIntoHiveTable abstract class NativeParquetInsertIntoHiveTableBase( - cmd: InsertIntoHiveTable, - override val child: SparkPlan) + @transient cmd: InsertIntoHiveTable, + @transient override val child: SparkPlan) extends UnaryExecNode with NativeSupports { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala index e470a4b99..acd5b1742 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetScanBase.scala @@ -49,7 +49,6 @@ abstract class NativeParquetScanBase(basedFileScan: FileSourceScanExec) })) val nativePruningPredicateFilters = this.nativePruningPredicateFilters val nativeFileSchema = this.nativeFileSchema - val nativeFileGroups = this.nativeFileGroups val nativePartitionSchema = this.nativePartitionSchema val projection = schema.map(field => basedFileScan.relation.schema.fieldIndex(field.name)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala index 689b46e60..3df5685aa 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkBase.scala @@ -36,11 +36,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job -import org.apache.spark.OneToOneDependency +import org.apache.spark.{OneToOneDependency, Partition} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports +import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.SortOrder @@ -91,10 +89,13 @@ abstract class NativeParquetSinkBase( val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) val nativeDependencies = new OneToOneDependency(inputRDD) :: Nil + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkSession.sparkContext, nativeMetrics, - inputRDD.partitions, + rddPartitions = nativePartitions.toArray, inputRDD.partitioner, nativeDependencies, inputRDD.isShuffleReadFull, @@ -146,7 +147,7 @@ abstract class NativeParquetSinkBase( .setValue(entry.getValue) .build()) - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val parquetSink = ParquetSinkExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, context)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala index 109cf0081..fbfdc7900 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectBase.scala @@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.analysis.ResolvedStar import org.apache.spark.sql.catalyst.expressions.Alias @@ -71,16 +73,19 @@ abstract class NativeProjectBase(projectList: Seq[NamedExpression], override val val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) val nativeProject = this.nativeProject + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeProjectExec = nativeProject.toBuilder .setInput(inputRDD.nativePlan(inputPartition, taskContext)) .build() diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala index 3d2b21fc9..97638e032 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsBase.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.auron.plan import scala.jdk.CollectionConverters._ -import org.apache.spark.OneToOneDependency -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports +import org.apache.spark.{OneToOneDependency, Partition} +import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -54,16 +52,20 @@ abstract class NativeRenameColumnsBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPlan = inputRDD.nativePlan(inputRDD.partitions(partition.index), taskContext) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload + val inputPlan = inputRDD.nativePlan(inputPartition, taskContext) buildRenameColumnsExec(inputPlan, renamedColumnNames) }, friendlyName = "NativeRDD.RenameColumns") diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala index 76648e8a7..6b6819eb5 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeBase.scala @@ -25,15 +25,11 @@ import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag import scala.util.hashing.byteswap32 -import org.apache.spark.{OneToOneDependency, Partitioner, RangePartitioner, ShuffleDependency, SparkEnv, TaskContext} +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, RangePartitioner, ShuffleDependency, SparkEnv, TaskContext} import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleWriteProcessor -import org.apache.spark.sql.auron.NativeConverters -import org.apache.spark.sql.auron.NativeHelper -import org.apache.spark.sql.auron.NativeRDD -import org.apache.spark.sql.auron.NativeSupports -import org.apache.spark.sql.auron.Shims +import org.apache.spark.sql.auron.{NativeConverters, NativeHelper, NativePartition, NativeRDD, NativeSupports, Shims} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, NullsFirst, UnsafeProjection} import org.apache.spark.sql.catalyst.expressions.Literal @@ -54,8 +50,8 @@ import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.{IpcReaderExecNode, PhysicalExprNode, PhysicalHashRepartition, PhysicalPlanNode, PhysicalRangeRepartition, PhysicalRepartition, PhysicalRoundRobinRepartition, PhysicalSingleRepartition, PhysicalSortExprNode, Schema, SortExecNode} abstract class NativeShuffleExchangeBase( - override val outputPartitioning: Partitioning, - override val child: SparkPlan) + @transient override val outputPartitioning: Partitioning, + @transient override val child: SparkPlan) extends ShuffleExchangeLike with NativeSupports { @@ -125,6 +121,7 @@ abstract class NativeShuffleExchangeBase( override def doExecuteNative(): NativeRDD = { val shuffleHandle = shuffleDependency.shuffleHandle val rdd = doExecuteNonNative() + val nativeSchema = this.nativeSchema val nativeMetrics = SparkMetricNode( Map(), @@ -148,7 +145,6 @@ abstract class NativeShuffleExchangeBase( (partition, taskContext) => { val shuffleReadMetrics = taskContext.taskMetrics().createTempShuffleReadMetrics() val metricReporter = new SQLShuffleReadMetricsReporter(shuffleReadMetrics, metrics) - val nativeSchema = this.nativeSchema // store fetch iterator in jni resource before native compute val jniResourceId = s"NativeShuffleReadExec:${UUID.randomUUID().toString}" @@ -244,15 +240,21 @@ abstract class NativeShuffleExchangeBase( case _ => null } + val nativeInputPartitions = nativeInputRDD.partitions.map { p => + NativePartition[Partition](p.index, p) + } + val nativeHashExprs = this.nativeHashExprs + val nativeSortExecNode = this.nativeSortExecNode + val nativeShuffleRDD = new NativeRDD( nativeInputRDD.sparkContext, nativeMetrics, - nativeInputRDD.partitions, + rddPartitions = nativeInputPartitions.toArray, nativeInputRDD.partitioner, new OneToOneDependency(nativeInputRDD) :: Nil, nativeInputRDD.isShuffleReadFull, (partition, taskContext) => { - val nativeInputPartition = nativeInputRDD.partitions(partition.index) + val nativeInputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val repartitionBuilder = PhysicalRepartition.newBuilder() val nativeOutputPartitioning = outputPartitioning match { case SinglePartition => diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala index 593243f91..c3d7f4ebd 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffledHashJoinBase.scala @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide} import org.apache.spark.sql.catalyst.expressions.Expression @@ -101,6 +103,7 @@ abstract class NativeShuffledHashJoinBase( val nativeJoinOn = this.nativeJoinOn val nativeJoinType = this.nativeJoinType val nativeBuildSide = this.nativeBuildSide + val nativeSchema = this.nativeSchema val (partitions, partitioner) = if (joinType != RightOuter) { (leftRDD.partitions, leftRDD.partitioner) @@ -108,19 +111,24 @@ abstract class NativeShuffledHashJoinBase( (rightRDD.partitions, rightRDD.partitioner) } val dependencies = Seq(new OneToOneDependency(leftRDD), new OneToOneDependency(rightRDD)) + val nativePartitions = partitions.map { p => + NativePartition[(Partition, Partition)]( + p.index, + (leftRDD.partitions(p.index), rightRDD.partitions(p.index))) + } new NativeRDD( sparkContext, nativeMetrics, - partitions, + rddPartitions = nativePartitions.toArray, partitioner, dependencies, leftRDD.isShuffleReadFull && rightRDD.isShuffleReadFull, (partition, taskContext) => { - val leftPartition = leftRDD.partitions(partition.index) + val (leftPartition, rightPartition) = + partition.asInstanceOf[NativePartition[(Partition, Partition)]].payload val leftChild = leftRDD.nativePlan(leftPartition, taskContext) - val rightPartition = rightRDD.partitions(partition.index) val rightChild = rightRDD.nativePlan(rightPartition, taskContext) val hashJoinExec = pb.HashJoinExecNode diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala index 579f3f3d7..879b9f73b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.Ascending import org.apache.spark.sql.catalyst.expressions.Attribute @@ -97,16 +99,19 @@ abstract class NativeSortBase( val inputRDD = NativeHelper.executeNative(child) val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil) val nativeSortExprs = this.nativeSortExprs + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeSortExec = SortExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala index 1b3518da8..5c6f26b47 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortMergeJoinBase.scala @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.Ascending import org.apache.spark.sql.catalyst.expressions.Expression @@ -108,6 +110,7 @@ abstract class NativeSortMergeJoinBase( val nativeSortOptions = this.nativeSortOptions val nativeJoinOn = this.nativeJoinOn val nativeJoinType = this.nativeJoinType + val nativeSchema = this.nativeSchema val (partitions, partitioner) = if (joinType != RightOuter) { (leftRDD.partitions, leftRDD.partitioner) @@ -115,6 +118,11 @@ abstract class NativeSortMergeJoinBase( (rightRDD.partitions, rightRDD.partitioner) } val dependencies = Seq(new OneToOneDependency(leftRDD), new OneToOneDependency(rightRDD)) + val nativePartitions = partitions.map { p => + NativePartition[(Partition, Partition)]( + p.index, + (leftRDD.partitions(p.index), rightRDD.partitions(p.index))) + } val isShuffleReadFull = joinType match { case _: InnerLike => logInfo("SortMergeJoin Inner mark shuffleReadFull = false") @@ -131,15 +139,15 @@ abstract class NativeSortMergeJoinBase( new NativeRDD( sparkContext, nativeMetrics, - partitions, + rddPartitions = nativePartitions.toArray, partitioner, dependencies, isShuffleReadFull, (partition, taskContext) => { - val leftPartition = leftRDD.partitions(partition.index) + val (leftPartition, rightPartition) = + partition.asInstanceOf[NativePartition[(Partition, Partition)]].payload val leftChild = leftRDD.nativePlan(leftPartition, taskContext) - val rightPartition = rightRDD.partitions(partition.index) val rightChild = rightRDD.nativePlan(rightPartition, taskContext) val sortMergeJoinExec = SortMergeJoinExecNode diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala index f54821870..4ce477a6b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala @@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.auron.Shims import org.apache.spark.sql.catalyst.InternalRow @@ -182,16 +184,19 @@ abstract class NativePartialTakeOrderedBase( override def doExecuteNative(): NativeRDD = { val inputRDD = NativeHelper.executeNative(child) val nativeSortExprs = this.nativeSortExprs + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, metrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil), - inputRDD.partitions, - inputRDD.partitioner, - new OneToOneDependency(inputRDD) :: Nil, + rddPartitions = nativePartitions.toArray, + rddPartitioner = inputRDD.partitioner, + rddDependencies = new OneToOneDependency(inputRDD) :: Nil, rddShuffleReadFull = false, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeTakeOrderedExec = SortExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala index 822b61768..3c5edbe6e 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionBase.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.EmptyPartitionsExecNode import org.apache.auron.protobuf.PhysicalPlanNode -import org.apache.auron.protobuf.Schema import org.apache.auron.protobuf.UnionExecNode import org.apache.auron.protobuf.UnionInput @@ -50,6 +49,8 @@ abstract class NativeUnionBase( .filterKeys(Set("stage_id", "output_rows")) .toSeq: _*) + val nativeSchema = Util.getNativeSchema(output) + override def doExecuteNative(): NativeRDD = { val rdds = children.map(c => NativeHelper.executeNative(c)) val nativeMetrics = @@ -115,6 +116,4 @@ abstract class NativeUnionBase( .setNumPartitions(numPartitions) .build()) .build() - - val nativeSchema: Schema = Util.getNativeSchema(output) } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala index f6e5ea30a..316ab17da 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowBase.scala @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap import scala.jdk.CollectionConverters._ import org.apache.spark.OneToOneDependency +import org.apache.spark.Partition import org.apache.spark.sql.auron.NativeConverters import org.apache.spark.sql.auron.NativeHelper import org.apache.spark.sql.auron.NativeRDD +import org.apache.spark.sql.auron.NativePartition import org.apache.spark.sql.auron.NativeSupports import org.apache.spark.sql.catalyst.expressions.Ascending import org.apache.spark.sql.catalyst.expressions.Attribute @@ -272,16 +274,19 @@ abstract class NativeWindowBase( val nativeWindowExprs = this.nativeWindowExprs val nativeOrderSpecExprs = this.nativeOrderSpecExprs val nativePartitionSpecExprs = this.nativePartitionSpecExprs + val nativePartitions = inputRDD.partitions.map { inputPartition => + NativePartition[Partition](inputPartition.index, inputPartition) + } new NativeRDD( sparkContext, nativeMetrics, - rddPartitions = inputRDD.partitions, + rddPartitions = nativePartitions.toArray, rddPartitioner = inputRDD.partitioner, rddDependencies = new OneToOneDependency(inputRDD) :: Nil, inputRDD.isShuffleReadFull, (partition, taskContext) => { - val inputPartition = inputRDD.partitions(partition.index) + val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload val nativeWindowExec = pb.WindowExecNode .newBuilder() .setInput(inputRDD.nativePlan(inputPartition, taskContext)) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index 6dfc8be79..e1d56d9ff 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -45,7 +45,7 @@ import org.apache.auron.{protobuf => pb} import org.apache.auron.jni.JniBridge import org.apache.auron.sparkver -abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) +abstract class NativeHiveTableScanBase(@transient basedHiveScan: HiveTableScanExec) extends LeafExecNode with NativeSupports {