Skip to content

Commit

Permalink
[SPARK-13609] [SQL] Support Column Pruning for MapPartitions
Browse files Browse the repository at this point in the history
#### What changes were proposed in this pull request?

This PR is to prune unnecessary columns when the operator is  `MapPartitions`. The solution is to add an extra `Project` in the child node.

For the other two operators `AppendColumns` and `MapGroups`, it sounds doable. More discussions are required. The major reason is the current implementation of the `inputPlan` of `groupBy` is based on the child of `AppendColumns`. It might be a bug? Thus, will submit a separate PR.

#### How was this patch tested?

Added a test case in ColumnPruningSuite to verify the rule. Added another test case in DatasetSuite.scala to verify the data.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #11460 from gatorsmile/datasetPruningNew.
  • Loading branch information
gatorsmile authored and davies committed Mar 2, 2016
1 parent d8afd45 commit 8f8d8a2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ object ColumnPruning extends Rule[LogicalPlan] {
}.unzip._1
}
a.copy(child = Expand(newProjects, newOutput, grandChild))
// TODO: support some logical plan for Dataset

// Prunes the unused columns from child of MapPartitions
case mp @ MapPartitions(_, _, _, child) if (child.outputSet -- mp.references).nonEmpty =>
mp.copy(child = prunedChild(child, mp.references))

// Prunes the unused columns from child of Aggregate/Window/Expand/Generate
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.reflect.runtime.universe.TypeTag

import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -249,5 +252,16 @@ class ColumnPruningSuite extends PlanTest {
comparePlans(Optimize.execute(query), expected)
}

implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
private val func = identity[Iterator[OtherTuple]] _

test("Column pruning on MapPartitions") {
val input = LocalRelation('_1.int, '_2.int, 'c.int)
val plan1 = MapPartitions(func, input)
val correctAnswer1 =
MapPartitions(func, Project(Seq('_1, '_2), input)).analyze
comparePlans(Optimize.execute(plan1.analyze), correctAnswer1)
}

// todo: add more tests for column pruning
}
11 changes: 10 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
("a", 2), ("b", 3), ("c", 4))
}

test("map with type change") {
test("map with type change with the exact matched number of attributes") {
val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()

checkAnswer(
Expand All @@ -123,6 +123,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
}

test("map with type change with less attributes") {
val ds = Seq(("a", 1, 3), ("b", 2, 4), ("c", 3, 5)).toDS()

checkAnswer(
ds.as[OtherTuple]
.map(identity[OtherTuple]),
OtherTuple("a", 1), OtherTuple("b", 2), OtherTuple("c", 3))
}

test("map and group by with class data") {
// We inject a group by here to make sure this test case is future proof
// when we implement better pipelining and local execution mode.
Expand Down

0 comments on commit 8f8d8a2

Please sign in to comment.