Skip to content

Commit

Permalink
[SPARK-21781][SQL] Modify DataSourceScanExec to use concrete ColumnVe…
Browse files Browse the repository at this point in the history
…ctor type.

## What changes were proposed in this pull request?

As mentioned at #18680 (comment), when we have more `ColumnVector` implementations, it might (or might not) have huge performance implications because it might disable inlining, or force virtual dispatches.

As for read path, one of the major paths is the one generated by `ColumnBatchScan`. Currently it refers `ColumnVector` so the penalty will be bigger as we have more classes, but we can know the concrete type from its usage, e.g. vectorized Parquet reader uses `OnHeapColumnVector`. We can use the concrete type in the generated code directly to avoid the penalty.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #18989 from ueshin/issues/SPARK-21781.
  • Loading branch information
ueshin authored and cloud-fan committed Aug 29, 2017
1 parent c7270a4 commit 32fa0b8
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {

val inMemoryTableScan: InMemoryTableScanExec = null

def vectorTypes: Option[Seq[String]] = None

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
Expand Down Expand Up @@ -79,17 +81,19 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val scanTimeTotalNs = ctx.freshName("scanTime")
ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")

val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
val columnarBatchClz = classOf[ColumnarBatch].getName
val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")

val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
s"$name = $batch.column($i);"
val columnVectorClzs = vectorTypes.getOrElse(
Seq.fill(colVars.size)(classOf[ColumnVector].getName))
val columnAssigns = colVars.zip(columnVectorClzs).zipWithIndex.map {
case ((name, columnVectorClz), i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
s"$name = ($columnVectorClz) $batch.column($i);"
}

val nextBatch = ctx.freshName("nextBatch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ case class FileSourceScanExec(
false
}

override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema)

@transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
val startTime = System.nanoTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ trait FileFormat {
false
}

/**
* Returns concrete column vector class names for each column to be used in a columnar batch
* if this format supports returning columnar batch.
*/
def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
None
}

/**
* Returns whether a file with `path` could be splitted or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -272,6 +273,13 @@ class ParquetFileFormat
schema.forall(_.dataType.isInstanceOf[AtomicType])
}

override def vectorTypes(
requiredSchema: StructType,
partitionSchema: StructType): Option[Seq[String]] = {
Option(Seq.fill(requiredSchema.fields.length + partitionSchema.fields.length)(
classOf[OnHeapColumnVector].getName))
}

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
Expand Down

0 comments on commit 32fa0b8

Please sign in to comment.