Skip to content

Commit

Permalink
renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jan 15, 2016
1 parent c9741ea commit 7c05703
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
/**
* An interface for those physical operators that support codegen.
*/
trait SupportCodegen extends SparkPlan {
trait CodegenSupport extends SparkPlan {

/**
* Whether this SparkPlan support whole stage codegen or not.
Expand All @@ -39,12 +39,12 @@ trait SupportCodegen extends SparkPlan {
/**
* Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
*/
private var parent: SupportCodegen = null
private var parent: CodegenSupport = null

/**
* Returns an input RDD of InternalRow and Java source code to process them.
*/
def produce(ctx: CodegenContext, parent: SupportCodegen): (RDD[InternalRow], String) = {
def produce(ctx: CodegenContext, parent: CodegenSupport): (RDD[InternalRow], String) = {
this.parent = parent
doProduce(ctx)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ trait SupportCodegen extends SparkPlan {
* This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
* an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends LeafNode with SupportCodegen {
case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {

override def output: Seq[Attribute] = child.output

Expand Down Expand Up @@ -159,8 +159,8 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with SupportCodegen {
* doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
* used to generated code for BoundReference.
*/
case class WholeStageCodegen(plan: SupportCodegen, children: Seq[SparkPlan])
extends SparkPlan with SupportCodegen {
case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
extends SparkPlan with CodegenSupport {

override def output: Seq[Attribute] = plan.output

Expand Down Expand Up @@ -266,7 +266,7 @@ case class WholeStageCodegen(plan: SupportCodegen, children: Seq[SparkPlan])
private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Rule[SparkPlan] {

private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: SupportCodegen if plan.supportCodegen =>
case plan: CodegenSupport if plan.supportCodegen =>
// Non-leaf with CodegenFallback does not work with whole stage codegen
val willFallback = plan.expressions.exists(
_.find(e => e.isInstanceOf[CodegenFallback] && !e.isInstanceOf[LeafExpression]).isDefined
Expand All @@ -279,8 +279,8 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru

def apply(plan: SparkPlan): SparkPlan = {
if (sqlContext.conf.wholeStageEnabled) {
plan.transformUp {
case plan: SupportCodegen if supportCodegen(plan) &&
plan.transform {
case plan: CodegenSupport if supportCodegen(plan) &&
// Whole stage codegen is only useful when there are at least two levels of operators that
// support it (save at least one projection/iterator).
plan.children.exists(supportCodegen) =>
Expand All @@ -290,7 +290,7 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru
case p if !supportCodegen(p) =>
inputs += p
InputAdapter(p)
}.asInstanceOf[SupportCodegen]
}.asInstanceOf[CodegenSupport]
WholeStageCodegen(combined, inputs)
}
} else {
Expand Down
Expand Up @@ -30,15 +30,15 @@ import org.apache.spark.util.MutablePair
import org.apache.spark.util.random.PoissonSampler

case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryNode with SupportCodegen {
extends UnaryNode with CodegenSupport {

override private[sql] lazy val metrics = Map(
"numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

protected override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = {
child.asInstanceOf[SupportCodegen].produce(ctx, this)
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
Expand Down Expand Up @@ -69,17 +69,15 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
}


case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode with SupportCodegen {
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = child.output

private[sql] override lazy val metrics = Map(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def supportCodegen: Boolean = true

protected override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = {
child.asInstanceOf[SupportCodegen].produce(ctx, this)
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
Expand Down Expand Up @@ -153,7 +151,7 @@ case class Range(
numSlices: Int,
numElements: BigInt,
output: Seq[Attribute])
extends LeafNode with SupportCodegen {
extends LeafNode with CodegenSupport {

protected override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = {
val initTerm = ctx.freshName("range_initRange")
Expand Down

0 comments on commit 7c05703

Please sign in to comment.