Skip to content

Commit

Permalink
[SPARK-10770] [SQL] SparkPlan.executeCollect/executeTake should retur…
Browse files Browse the repository at this point in the history
…n InternalRow rather than external Row.

Author: Reynold Xin <rxin@databricks.com>

Closes #8900 from rxin/SPARK-10770-1.
  • Loading branch information
rxin committed Sep 30, 2015
1 parent c7b29ae commit 03cca5d
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ class DataFrame private[sql](
* @since 1.3.0
*/
def collect(): Array[Row] = withNewExecutionId {
queryExecution.executedPlan.executeCollect()
queryExecution.executedPlan.executeCollectPublic()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ private[sql] case class LocalTableScan(

protected override def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row]).toArray
override def executeCollect(): Array[InternalRow] = {
rows.toArray
}

override def executeTake(limit: Int): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
override def executeTake(limit: Int): Array[InternalRow] = {
rows.take(limit).toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
iter.map(converter(_).asInstanceOf[Row])
}.collect()
def executeCollect(): Array[InternalRow] = {
execute().map(_.copy()).collect()
}

/**
* Runs this query returning the result as an array, using external Row format.
*/
def executeCollectPublic(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
executeCollect().map(converter(_).asInstanceOf[Row])
}

/**
* Runs this query returning the first `n` rows as an array.
*
* This is modeled after RDD.take but never runs any job locally on the driver.
*/
def executeTake(n: Int): Array[Row] = {
def executeTake(n: Int): Array[InternalRow] = {
if (n == 0) {
return new Array[Row](0)
return new Array[InternalRow](0)
}

val childRDD = execute().map(_.copy())
Expand Down Expand Up @@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
partsScanned += numPartsToTry
}

val converter = CatalystTypeConverters.createToScalaConverter(schema)
buf.toArray.map(converter(_).asInstanceOf[Row])
buf.toArray
}

private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan)
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition

override def executeCollect(): Array[Row] = child.executeTake(limit)
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)

protected override def doExecute(): RDD[InternalRow] = {
val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) {
Expand Down Expand Up @@ -258,9 +258,8 @@ case class TakeOrderedAndProject(
projection.map(data.map(_)).getOrElse(data)
}

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
collectData().map(converter(_).asInstanceOf[Row])
override def executeCollect(): Array[InternalRow] = {
collectData()
}

// TODO: Terminal split should be implemented differently from non-terminal split.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
}

override def output: Seq[Attribute] = cmd.output

override def children: Seq[SparkPlan] = Nil

override def executeCollect(): Array[Row] = sideEffectResult.toArray
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray

protected override def doExecute(): RDD[InternalRow] = {
val convert = CatalystTypeConverters.createToCatalystConverter(schema)
val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
sqlContext.sparkContext.parallelize(converted, 1)
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

override def argString: String = cmd.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,10 @@ object EvaluatePython {

def takeAndServe(df: DataFrame, n: Int): Int = {
registerPicklers()
// This is an annoying hack - we should refactor the code so executeCollect and executeTake
// returns InternalRow rather than Row.
val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row =>
EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], df.schema)
})
val iter = new SerDeUtil.AutoBatchedPickler(
df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
EvaluatePython.toJava(row, df.schema)
})
PythonRDD.serveIterator(iter, s"serve-DataFrame")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ object SparkPlanTest {
}
}
)
resolvedPlan.executeCollect().toSeq
resolvedPlan.executeCollectPublic().toSeq
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {

/** Extends QueryExecution with hive specific features. */
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
extends super.QueryExecution(logicalPlan) {
extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {

/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
Expand All @@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
.mkString("\t")
}
case command: ExecutedCommand =>
command.executeCollect().map(_(0).toString)
command.executeCollect().map(_.getString(0))

case other =>
val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq
val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
protected[sql] lazy val sideEffectResult: Seq[Row] = {
protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
Expand Down Expand Up @@ -267,10 +267,10 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
Seq.empty[Row]
Seq.empty[InternalRow]
}

override def executeCollect(): Array[Row] = sideEffectResult.toArray
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray

protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
Expand Down

0 comments on commit 03cca5d

Please sign in to comment.