Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.auron.util.SparkVersionUtil
class NativeRDD(
@transient private val rddSparkContext: SparkContext,
val metrics: SparkMetricNode,
private val rddPartitions: Array[Partition],
@transient private val rddPartitions: Array[Partition],
private val rddPartitioner: Option[Partitioner],
private val rddDependencies: Seq[Dependency[_]],
private val rddShuffleReadFull: Boolean,
Expand All @@ -46,23 +46,24 @@ class NativeRDD(
with Logging
with Serializable {

// use serializable wrapper to avoid serializing nativePlan
val nativePlanWrapper = new NativePlanWrapper(nativePlan)

if (friendlyName != null) {
setName(friendlyName)
}

// use serializable wrapper to avoid serializing nativePlan
val nativePlanWrapper = new NativePlanWrapper(nativePlan)

def nativePlan(p: Partition, tc: TaskContext): PhysicalPlanNode = {
nativePlanWrapper.plan(p, tc)
}

def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)

override val partitioner: Option[Partitioner] = rddPartitioner

override protected def getPartitions: Array[Partition] = rddPartitions
override protected def getDependencies: Seq[Dependency[_]] = rddDependencies
override val partitioner: Option[Partitioner] = rddPartitioner

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val computingNativePlan = nativePlanWrapper.plan(split, context)
Expand Down Expand Up @@ -101,6 +102,8 @@ class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)

}

case class NativePartition[P](override val index: Int, payload: P) extends Partition {}

class NativePlanWrapper(var p: (Partition, TaskContext) => PhysicalPlanNode)
extends Serializable {
def plan(split: Partition, context: TaskContext): PhysicalPlanNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.auron.protobuf.FFIReaderExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.protobuf.Schema

abstract class ConvertToNativeBase(override val child: SparkPlan)
abstract class ConvertToNativeBase(@transient override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
override val nodeName: String = "ConvertToNative"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import scala.collection.immutable.SortedMap
import scala.jdk.CollectionConverters._

import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativePartition
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.sql.auron.Shims
import org.apache.spark.sql.catalyst.expressions.Ascending
Expand Down Expand Up @@ -173,18 +175,22 @@ abstract class NativeAggBase(
val nativeAggrModes = this.nativeAggrModes
val nativeAggrs = this.nativeAggrs
val nativeGroupingExprs = this.nativeGroupingExprs
val nativePartitions = inputRDD.partitions.map { inputPartition =>
NativePartition[Partition](inputPartition.index, inputPartition)
}

new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitions = nativePartitions.toArray,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload

lazy val inputPlan =
inputRDD.nativePlan(inputRDD.partitions(partition.index), taskContext)
inputRDD.nativePlan(inputPartition, taskContext)
pb.PhysicalPlanNode
.newBuilder()
.setAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ import org.apache.auron.{protobuf => pb, sparkver}
import org.apache.auron.jni.JniBridge
import org.apache.auron.metric.SparkMetricNode

abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val child: SparkPlan)
abstract class NativeBroadcastExchangeBase(
@transient mode: BroadcastMode,
override val child: SparkPlan)
extends BroadcastExchangeLike
with NativeSupports {

Expand Down Expand Up @@ -219,7 +221,7 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi
metrics("dataSize") += byteArray.length
})

val input = inputRDD.nativePlan(inputRDD.partitions(split.index), context)
val input = inputRDD.nativePlan(split, context)
val nativeIpcWriterExec = pb.PhysicalPlanNode
.newBuilder()
.setIpcWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ import scala.jdk.CollectionConverters._

import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.sql.auron.Shims
import org.apache.spark.sql.auron.{NativeConverters, NativeHelper, NativePartition, NativeRDD, NativeSupports, Shims}
import org.apache.spark.sql.auron.join.JoinBuildSides.{JoinBuildLeft, JoinBuildRight, JoinBuildSide}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.FullOuter
Expand Down Expand Up @@ -145,25 +141,30 @@ abstract class NativeBroadcastJoinBase(
Seq(FullOuter, LeftOuter, LeftSemi, LeftAnti).contains(joinType)
})

val nativePartitions = probedRDD.partitions.map { p =>
NativePartition[Partition](p.index, p)
}

new NativeRDD(
sparkContext,
nativeMetrics,
probedRDD.partitions,
rddPartitions = nativePartitions.toArray,
rddPartitioner = probedRDD.partitioner,
rddDependencies = new OneToOneDependency(probedRDD) :: Nil,
probedShuffleReadFull,
(partition, context) => {
val partition0 = new Partition() {
override def index: Int = 0
}
val probedPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val (leftChild, rightChild) = broadcastSide match {
case JoinBuildLeft =>
(
leftRDD.nativePlan(partition0, context),
rightRDD.nativePlan(rightRDD.partitions(partition.index), context))
rightRDD.nativePlan(probedPartition, context))
case JoinBuildRight =>
(
leftRDD.nativePlan(leftRDD.partitions(partition.index), context),
leftRDD.nativePlan(probedPartition, context),
rightRDD.nativePlan(partition0, context))
}
val cachedBuildHashMapId = s"bhm_stage${context.stageId}_rdd${builtRDD.id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.auron.plan
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.OneToOneDependency
import org.apache.spark.sql.auron.{NativeHelper, NativeRDD, NativeSupports, Shims}
import org.apache.spark.{OneToOneDependency, Partition}
import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports, Shims}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{SinglePartition, UnknownPartitioning}
Expand Down Expand Up @@ -65,16 +65,19 @@ abstract class NativeCollectLimitBase(limit: Int, offset: Int, override val chil
// merge all LocalLimit child partitions into a single partition
val shuffled = Shims.get.createNativeShuffleExchangeExec(SinglePartition, partial)
val singlePartitionRDD = NativeHelper.executeNative(shuffled)
val nativePartitions = singlePartitionRDD.partitions.map { p =>
NativePartition[Partition](p.index, p)
}

new NativeRDD(
sparkContext,
SparkMetricNode(metrics, singlePartitionRDD.metrics :: Nil),
singlePartitionRDD.partitions,
rddPartitions = nativePartitions.toArray,
singlePartitionRDD.partitioner,
new OneToOneDependency(singlePartitionRDD) :: Nil,
rddShuffleReadFull = false,
(partition, taskContext) => {
val inputPartition = singlePartitionRDD.partitions(partition.index)
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val nativeLimitExec = LimitExecNode
.newBuilder()
.setInput(singlePartitionRDD.nativePlan(inputPartition, taskContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import scala.collection.immutable.SortedMap
import scala.jdk.CollectionConverters._

import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativePartition
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand Down Expand Up @@ -75,16 +77,19 @@ abstract class NativeExpandBase(
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSchema = this.nativeSchema
val nativeProjections = this.nativeProjections
val nativePartitions = inputRDD.partitions.map { inputPartition =>
NativePartition[Partition](inputPartition.index, inputPartition)
}

new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitions = nativePartitions.toArray,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val nativeExpandExec = ExpandExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
override val output: Seq[Attribute] = basedFileScan.output
override val outputPartitioning: Partitioning = basedFileScan.outputPartitioning

@transient
protected val inputFileScanRDD: FileScanRDD = {
MethodUtils.invokeMethod(basedFileScan, true, "prepare")
MethodUtils.invokeMethod(basedFileScan, true, "waitForSubqueries")
Expand All @@ -77,13 +78,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)

private val partitionSchema = basedFileScan.relation.partitionSchema

private val fileSizes = inputFileScanRDD.filePartitions
.flatMap(_.files)
.groupBy(_.filePath)
.mapValues(_.foldLeft(0L)(_ + _.length))
.map(identity) // make this map serializable
.toMap

// predicate pruning is buggy for decimal type, so we need to
// temporarily disable predicate pruning for decimal type
// see https://github.com/apache/auron/issues/1032
Expand All @@ -104,7 +98,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
protected def nativePartitionSchema: pb.Schema =
NativeConverters.convertSchema(partitionSchema)

protected def nativeFileGroups: FilePartition => pb.FileGroup = (partition: FilePartition) => {
protected def nativeFileGroups(partition: FilePartition): pb.FileGroup = {
// list input file statuses
val nativePartitionedFile = (file: PartitionedFile) => {
val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) =>
Expand All @@ -115,7 +109,7 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
pb.PartitionedFile
.newBuilder()
.setPath(s"${file.filePath}")
.setSize(fileSizes(file.filePath))
.setSize(file.fileSize)
.addAllPartitionValues(nativePartitionValues.asJava)
.setLastModifiedNs(0)
.setRange(
Expand All @@ -136,7 +130,6 @@ abstract class NativeFileSourceScanBase(basedFileScan: FileSourceScanExec)
nativePruningPredicateFilters
nativeFileSchema
nativePartitionSchema
nativeFileGroups

protected def putJniBridgeResource(
resourceId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativePartition
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.sql.catalyst.expressions.And
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand Down Expand Up @@ -91,15 +93,18 @@ abstract class NativeFilterBase(condition: Expression, override val child: Spark
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeFilterExprs = this.nativeFilterExprs
val nativePartitions = inputRDD.partitions.map { inputPartition =>
NativePartition[Partition](inputPartition.index, inputPartition)
}
new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitions = nativePartitions.toArray,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val nativeFilterExec = FilterExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import scala.jdk.CollectionConverters._

import com.google.protobuf.ByteString
import org.apache.spark.OneToOneDependency
import org.apache.spark.Partition
import org.apache.spark.sql.auron.NativeConverters
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativePartition
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand Down Expand Up @@ -134,16 +136,19 @@ abstract class NativeGenerateBase(
val nativeGenerator = this.nativeGenerator
val nativeGeneratorOutput = this.nativeGeneratorOutput
val nativeRequiredChildOutput = this.nativeRequiredChildOutput
val nativePartitions = inputRDD.partitions.map { inputPartition =>
NativePartition[Partition](inputPartition.index, inputPartition)
}

new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitions = nativePartitions.toArray,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val nativeGenerateExec = pb.GenerateExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.auron.plan

import scala.collection.immutable.SortedMap

import org.apache.spark.OneToOneDependency
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.NativeRDD
import org.apache.spark.sql.auron.NativeSupports
import org.apache.spark.{OneToOneDependency, Partition}
import org.apache.spark.sql.auron.{NativeHelper, NativePartition, NativeRDD, NativeSupports}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.AllTuples
Expand Down Expand Up @@ -53,16 +51,19 @@ abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val child
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativePartitions = inputRDD.partitions.map { inputPartition =>
NativePartition[Partition](inputPartition.index, inputPartition)
}

new NativeRDD(
sparkContext,
nativeMetrics,
inputRDD.partitions,
inputRDD.partitioner,
new OneToOneDependency(inputRDD) :: Nil,
rddPartitions = nativePartitions.toArray,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
rddShuffleReadFull = false,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val inputPartition = partition.asInstanceOf[NativePartition[Partition]].payload
val nativeLimitExec = LimitExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
Expand Down
Loading
Loading