Skip to content

Commit

Permalink
config the number of projections in each Expand
Browse files Browse the repository at this point in the history
  • Loading branch information
DonnyZone committed Sep 26, 2017
1 parent 0809700 commit 9448371
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1297,17 +1297,21 @@ object SplitAggregateWithExpand extends Rule[LogicalPlan] {
/**
* Split [[Expand]] operator to a number of [[Expand]] operators
*/
private def splitExpand(expand: Expand): Seq[Expand] = {
val expands: Seq[Expand] = expand.projections.map { projection =>
Expand(Seq(projection), expand.output, expand.child)
private def splitExpand(expand: Expand, num: Int): Seq[Expand] = {
val groupedProjections = expand.projections.grouped(num).toList
val expands: Seq[Expand] = groupedProjections.map { projectionSeq =>
Expand(projectionSeq, expand.output, expand.child)
}
expands
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
if (SQLConf.get.groupingWithUnion && projections.length > 1) {
val aggregates: Seq[Aggregate] = splitExpand(e).map { expand =>
if (SQLConf.get.groupingWithUnion
&& projections.length > SQLConf.get.groupingExpandProjections) {
val num = SQLConf.get.groupingExpandProjections
val subExpands = splitExpand(e, num)
val aggregates: Seq[Aggregate] = subExpands.map { expand =>
Aggregate(a.groupingExpressions, a.aggregateExpressions, expand)
}
Union(aggregates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val GROUPING_EXPAND_PROJECTIONS = buildConf("spark.sql.grouping.projectionsPerExpand")
.doc("The number of projections in each Expand operator when the grouping analytics is " +
"implemented using a union with aggregates for each group."
.intConf
.checkValue(projections => projections >= 1, "The number of projections for each Expand " +
"operator must be positive.")
.createWithDefault(1)

// The output committer class used by data sources. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
val OUTPUT_COMMITTER_CLASS = buildConf("spark.sql.sources.outputCommitterClass")
Expand Down Expand Up @@ -1166,6 +1174,8 @@ class SQLConf extends Serializable with Logging {

def groupingWithUnion: Boolean = getConf(GROUPING_WITH_UNION)

def groupingExpandProjections: Int = getConf(GROUPING_EXPAND_PROJECTIONS)

def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)

def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ class AggregateOptimizeSuite extends PlanTest {

val query = GroupingSets(Seq(Seq(), Seq(a), Seq(a, b)), Seq(a, b), testRelation,
Seq(a, b, count(c).as("count(c)")))
val optimized = SplitAggregateWithExpand(analyzer.execute(query))

val correctAnswer = Union(
val optimized1 = SplitAggregateWithExpand(analyzer.execute(query))
val correctAnswer1 = Union(
Seq(
Aggregate(Seq(a, b, gid), Seq(a, b, count(c).as("count(c)")),
Expand(Seq(Seq(a, b, c, nulInt, nulInt, 3)),
Expand All @@ -110,8 +110,28 @@ class AggregateOptimizeSuite extends PlanTest {
)
)
)

comparePlans(optimized, correctAnswer, false)
comparePlans(optimized1, correctAnswer1, false)

withSQLConf(GROUPING_EXPAND_PROJECTIONS.key -> "2") {
val optimized2 = SplitAggregateWithExpand(analyzer.execute(query))
val correctAnswer2 = Union(
Seq(
Aggregate(Seq(a, b, gid), Seq(a, b, count(c).as("count(c)")),
Expand(Seq(Seq(a, b, c, nulInt, nulInt, 3), Seq(a, b, c, a, nulInt, 1)),
Seq(a, b, c, a, b, gid),
Project(Seq(a, b, c, a.as("a"), b.as("b")), testRelation)
)
),
Aggregate(Seq(a, b, gid), Seq(a, b, count(c).as("count(c)")),
Expand(Seq(Seq(a, b, c, a, b, 0)),
Seq(a, b, c, a, b, gid),
Project(Seq(a, b, c, a.as("a"), b.as("b")), testRelation)
)
)
)
)
comparePlans(optimized2, correctAnswer2, false)
}
}
}
}

0 comments on commit 9448371

Please sign in to comment.