Skip to content

Commit

Permalink
[SPARK-26312][SQL] Replace RDDConversions.rowToRowRdd with RowEncoder…
Browse files Browse the repository at this point in the history
… to improve its conversion performance

## What changes were proposed in this pull request?

`RDDConversions` would get disproportionately slower as the number of columns in the query increased,
for the type of `converters` before is `scala.collection.immutable.::` which is a subtype of list.
This PR removing `RDDConversions` and using `RowEncoder` to convert the Row to InternalRow.

The test of `PrunedScanSuite` for 2000 columns and 20k rows takes 409 seconds before this PR, and 361 seconds after.

## How was this patch tested?

Test case of `PrunedScanSuite`

Closes #23262 from eatoncys/toarray.

Authored-by: 10129659 <chen.yanshan@zte.com.cn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
eatoncys authored and HyukjinKwon committed Dec 11, 2018
1 parent 05cf81e commit cbe9230
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 44 deletions.
Expand Up @@ -18,54 +18,14 @@
package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Encoder, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.DataType

object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r.productElement(i))
i += 1
}

mutableRow
}
}
}

/**
* Convert the objects inside Row into the types Catalyst expected.
*/
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[InternalRow] = {
data.mapPartitions { iterator =>
val numColumns = outputTypes.length
val mutableRow = new GenericInternalRow(numColumns)
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
iterator.map { r =>
var i = 0
while (i < numColumns) {
mutableRow(i) = converters(i)(r(i))
i += 1
}

mutableRow
}
}
}
}

object ExternalRDD {

Expand Down
Expand Up @@ -29,11 +29,11 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, Quali
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -416,7 +416,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow] = {
if (relation.relation.needConversion) {
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
val converters = RowEncoder(StructType.fromAttributes(output))
rdd.mapPartitions { iterator =>
iterator.map(converters.toRow)
}
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
Expand Down

0 comments on commit cbe9230

Please sign in to comment.