Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12796] [SQL] Whole stage codegen #10735

Closed
wants to merge 24 commits into from
Closed

Conversation

davies
Copy link
Contributor

@davies davies commented Jan 13, 2016

This is the initial work for whole stage codegen, it support Projection/Filter/Range, we will continue work on this to support more physical operators.

A micro benchmark show that a query with range, filter and projection could be 3X faster then before.

It's turned on by default. For a tree that have at least two chained plans, a WholeStageCodegen will be inserted into it, for example, the following plan

Limit 10
+- Project [(id#5L + 1) AS (id + 1)#6L]
   +- Filter ((id#5L & 1) = 1)
      +- Range 0, 1, 4, 10, [id#5L]

will be translated into

Limit 10
+- WholeStageCodegen
      +- Project [(id#1L + 1) AS (id + 1)#2L]
         +- Filter ((id#1L & 1) = 1)
            +- Range 0, 1, 4, 10, [id#1L]

Here is the call graph to generate Java source for A and B (A support codegen, but B does not):

  *   WholeStageCodegen       Plan A               FakeInput        Plan B
  * =========================================================================
  *
  * -> execute()
  *     |
  *  doExecute() -------->   produce()
  *                             |
  *                          doProduce()  -------> produce()
  *                                                   |
  *                                                doProduce() ---> execute()
  *                                                   |
  *                                                consume()
  *                          doConsume()  ------------|
  *                             |
  *  doConsume()  <-----    consume()

A SparkPlan that support codegen need to implement doProduce() and doConsume():

def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String

@SparkQA
Copy link

SparkQA commented Jan 13, 2016

Test build #49298 has finished for PR 10735 at commit f05524c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def testWholeStage(values: Int): Unit = {
val benchmark = new Benchmark("Single Int Column Scan", values)

benchmark.addCase("Without whole stage codegen") { iter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also consider switching the order and see if the results change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark will run each case 5 times in a row, so I think the order should not matter, or all the results we see should be re-visited. cc @nongli

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it and it didn't make any difference.

FYI on my machine with multiple runs, the speedup is around 2.5X. I have less cores but slightly higher frequency.

Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got 2.5X for the first run, once increased the number of rows, it became 3.0X, because there are some overhead for both query (catalyst and spark job).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should matter on the order or at least the benchmark should be written that way.

@nongli
Copy link
Contributor

nongli commented Jan 13, 2016

@davies can you include the generated code for the benchmark?

@@ -106,7 +106,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
final def execute(): RDD[InternalRow] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
doExecute()

if (sqlContext.conf.wholeStageEnabled && supportCodeGen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to actually make this a first class concept in the planner. Perhaps a phase that collects valid nodes and replaces them with an explicit operator called WholeStageCodeGen that holds all the operators that are getting. I'm not sure how hard that would be, but it would make debugging this a lot easier if it was visible from explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If WholeStageCodeGen hold a list/tree of operators, we still want to see the details of the tree.

Right now, we can have a special flag for the operator that support it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would push back here. The whole goal of this initial effort is to put some infrastructure in place so that we can iterate quickly moving forward. Right now its really hard given any query plan to understand what is happening under the covers because the control flow isn't explicitly visible anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some code I wrote a while ago that collapses operators by stages when I wanted to refactor the operators to have the local execution mode: https://github.com/apache/spark/compare/master...rxin:local-iter3?expand=1

It is not complete (e.g. doesn't collapse inputs, or handle joins completely), but it could be a good start.

@SparkQA
Copy link

SparkQA commented Jan 13, 2016

Test build #49344 has finished for PR 10735 at commit 43139a8.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 13, 2016

Test build #49348 has finished for PR 10735 at commit 67e81d4.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* The code generated for this query (some comments and empty lines are removed):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @nongli probably means pasting it here in the pull request, not pasting it into the source code...

@davies
Copy link
Contributor Author

davies commented Jan 14, 2016

    /* 007 */
    /* 008 */   private Expression[] expressions;
    /* 009 */   private boolean initRange0;
    /* 010 */   private long partitionEnd1;
    /* 011 */   private long number2;
    /* 012 */   private boolean overflow3;
    /* 013 */   private UnsafeRow unsafeRow = new UnsafeRow(0);
    /* 014 */
    /* 015 */   public GeneratedIterator(Expression[] exprs) {
    /* 016 */     expressions = exprs;
    /* 017 */     initRange0 = false;
    /* 018 */     partitionEnd1 = 0L;
    /* 019 */     number2 = 0L;
    /* 020 */     overflow3 = false;
    /* 021 */   }
    /* 022 */
    /* 023 */   protected void processNext() {
    /* 024 */
    /* 025 */     if (!initRange0) {
    /* 026 */       if (input.hasNext()) {
    /* 027 */         // number2 and partitionEnd1
    /* 051 */       } else {
    /* 052 */         return;
    /* 053 */       }
    /* 054 */       initRange0 = true;
    /* 055 */     }
    /* 056 */
    /* 057 */     while (!overflow3 && number2 < partitionEnd1) {
    /* 058 */       long value4 = number2;
    /* 059 */       number2 += 1L;
    /* 060 */       if (number2 < value4 ^ 1L < 0) {
    /* 061 */         overflow3 = true;
    /* 062 */       }
    /* 063 */
    /* 070 */       long primitive8 = -1L;
    /* 071 */       primitive8 = value4 & 1L;
    /* 074 */       boolean primitive6 = false;
    /* 075 */       primitive6 = primitive8 == 1L;
    /* 076 */       if (!false && primitive6) {
    /* 081 */         currentRow = unsafeRow;
    /* 082 */         return;
    /* 085 */       }
    /* 087 */     }
    /* 089 */   }
      */

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49375 has finished for PR 10735 at commit 73fe074.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #2382 has finished for PR 10735 at commit 73fe074.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #2381 has finished for PR 10735 at commit 73fe074.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49388 has finished for PR 10735 at commit 32309e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

protected InternalRow currentRow;
protected Iterator<InternalRow> input;

public boolean hasNext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we control this API, can we simplify the iterator API?

I think if we just combined hasNext/next() to next() which returns null if there are no more is simpler. The current set up is very annoying. For example, it's not valid to call next() without calling hasNext().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 we talked about using a single next() function when we were talking about local iterators and we all agreed that was a simpler interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the adapter for RDD interface, we can't change the API without changing RDD, I think.

We could do these refactor later, once we have more clear picture of how all the things work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it might be hard to change the iterator interface as part of this pr, if it is intertwined with the RDD iterator.

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49463 has finished for PR 10735 at commit 34a0a6f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49467 has finished for PR 10735 at commit 908c8cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait SupportCodegen extends SparkPlan
    • case class InputAdapter(child: SparkPlan) extends LeafNode with SupportCodegen
    • case class WholeStageCodegen(plan: SupportCodegen, children: Seq[SparkPlan])
    • case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
    • case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode with SupportCodegen

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49471 has finished for PR 10735 at commit 1feab20.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies davies force-pushed the whole2 branch 2 times, most recently from 13a422d to be494fb Compare January 15, 2016 21:54
@davies
Copy link
Contributor Author

davies commented Jan 15, 2016

@rxin @nongli @marmbrus Had followed the discussion yesterday, could you take another look?

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49487 has finished for PR 10735 at commit 7c05703.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait CodegenSupport extends SparkPlan
    • case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport
    • case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
    • case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode with CodegenSupport

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #49493 has finished for PR 10735 at commit 0b40106.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #49518 has finished for PR 10735 at commit 79a8f25.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #49519 has finished for PR 10735 at commit 594939f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #2386 has finished for PR 10735 at commit 3e05fec.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #2387 has finished for PR 10735 at commit d12a8da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2016

Test build #49521 has finished for PR 10735 at commit d12a8da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Jan 16, 2016

I'm going to merge this into master, any new comments will be addressed by follow up PR.

@asfgit asfgit closed this in 3c0d236 Jan 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants