Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27945][SQL] Minimal changes to support columnar processing #24795

Closed
wants to merge 20 commits into from

Conversation

revans2
Copy link
Contributor

@revans2 revans2 commented Jun 4, 2019

What changes were proposed in this pull request?

This is the first part of SPARK-27396. This is the minimum set of changes necessary to support a pluggable back end for columnar processing. Follow on JIRAs would cover removing some of the duplication between functionality in this patch and functionality currently covered by things like ColumnarBatchScan.

How was this patch tested?

I added in a new unit test to cover new code not really covered in other places.

I also did manual testing by implementing two plugins/extensions that take advantage of the new APIs to allow for columnar processing for some simple queries. One version runs on the CPU. The other version run on a GPU, but because it has unreleased dependencies I will not include a link to it yet.

The CPU version I would expect to add in as an example with other documentation in a follow on JIRA

This is contributed on behalf of NVIDIA Corporation.

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106161 has finished for PR 24795 at commit 40c8199.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@revans2
Copy link
Contributor Author

revans2 commented Jun 4, 2019

Just as a high level overview of what the changes are.

The classes in vectorized, including ColumnVector, and ColumnarBatch were moved to catalyst from core so that expressions could get access to them. To be able to comply with conventions about releasing a ColumnVector's resources we also made ColumnVector reference counted, but restrict the usage to really just BoundReference.

Expression was updated to include optional columnar evaluation that is off by default, as was SparkPlan. A few expressions were updated to also do columnar evaluation, but these are things like literals that there really is only one way to do it.

Columnar.scala is where the core of the changes are. It adds in a new rule that runs on the physical plan during the execution stage. It will look at the plan and find sections that should be run columnar and will insert transitions to/from columnar formatted data to allow for the processing to happen. It can also take a user supplied set of rules to also modify the plan so that an extension can add in columnar versions of operations and expressions themselves.

I also updated the Extensions API to allow for the rules to be injected.

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106162 has finished for PR 24795 at commit 484fb41.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @mengxr @rxin @rednaxelafx

@dongjoon-hyun
Copy link
Member

cc @dbtsai

NOTICE-binary Outdated Show resolved Hide resolved
@rxin
Copy link
Contributor

rxin commented Jun 5, 2019

@revans2 why do we need a ColumnarRule? It seems like you are bifurcating everything.

* inserting stages to build larger batches for more efficient processing, or stages that
* transition the data to/from an accelerator's memory.
*/
@Experimental
Copy link
Contributor

@rxin rxin Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All apis in execution are non-public so there's no need to document "experimental" or "unstable" api here. They only have public visibility for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how do we want to handle this situation where we are exposing some internal implementation details but with no real guarantees of stability? Not marking them as anything I felt gave the wrong impression that there was some guarantee.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are all private. The package.scala file documents that. Also execution and catalyst packages are ignored in the public API docs.

Basically it's use at your own risk type of internal APIs.

@revans2
Copy link
Contributor Author

revans2 commented Jun 5, 2019

@rxin we don't need a ColumnarRule. It just made the code simpler. I see your point and I am happy to change it if you like. I can do separate registration for pre/post rules, or I can just have them be internal to ColumnarRule.

class ColumnarRule {
  def pre: Rule[SparkPlan] = plan => plan
  def post: Rule[SparkPlan] = plan => plan
}

@SparkQA
Copy link

SparkQA commented Jun 5, 2019

Test build #106205 has finished for PR 24795 at commit 52e31ab.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2019

Test build #106242 has finished for PR 24795 at commit 2a3287b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Actually cleans up memory for this column vector. The column vector is really not usable after
* this.
*/
protected abstract void doClose();
Copy link
Member

@kiszk kiszk Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use more self-declarative name? This is because columnVector is public API for developers who want to support their storage.

* inserting stages to build larger batches for more efficient processing, or stages that
* transition the data to/from an accelerator's memory.
*/
class ColumnarRule {
Copy link
Member

@kiszk kiszk Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are pre and post appropriate naming, in particular, pre? IIUC correctly, pre is a mandatory function for columnar processing and post is an optional function to optimize columnars or to clean up resources. To me, a pair of pre and post looks dual. (Of course, I am not a native English speaker).

In addition, can you create a test case using ColunarRule (for example, a simplified version of this, but use pre and post)? It would help understanding of reviewers and can ensure the behavior of this API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me pre and post made since in relation to the comments, but perhaps not standalone. I'll see if I can make them more descriptive. I'll also add in a test.

@SparkQA
Copy link

SparkQA commented Jun 6, 2019

Test build #106255 has finished for PR 24795 at commit 31d7248.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jun 7, 2019

@revans2 I still feel this is pretty invasive. I think you can still accomplish your goal but reduce the scope quite significantly.

If I understand your use case clearly, what you want to do is to build some processing logic completely outside Spark. What you really need there is quite small (not very different from an uber UDF interface):

  1. The ability to inspect and transform Catalyst plans, to create your own plans, which already exists (catalyst rules / strategies).
  2. The ability for operators to read columnar input, which somewhat exists at the scan level for Parquet and ORC but is a bit hacky at the moment. It can be generalized to support arbitrary operators.
  3. A conversion operator that generates ColumnarBatches.

I think that's all you need?

There is no need to define column specific rules, or create a columnar expression interface. Those expressions are entirely outside Spark, and don't really need to be Spark Expression's.

I also don't understand why you added reference counting to these column vectors. Are you doing that because you might have other things outside Spark that wants to hold these vectors? If that's the case, I think those code outside Spark should just copy, rather than holding onto existing vectors. Otherwise, Spark can no longer reuse these vectors, and also would create bigger memory management challenges (why is Spark generating so many vectors that it's not releasing?).

@revans2
Copy link
Contributor Author

revans2 commented Jun 7, 2019

@rxin

I am confused on exactly what you are proposing.

The ability to inspect and transform Catalyst plans, to create your own plans, which already exists

That I understand and there are some large technical burdens with using the current set of APIs, but I can get into that more if needed.

The ability for operators to read columnar input, which somewhat exists at the scan level for Parquet and ORC but is a bit hacky at the moment. It can be generalized to support arbitrary operators.

This is the one that I don't understand what you are proposing. The only API even remotely associated with SparkPlan where columnar formatted data can leave the plan is

/**
* Returns all the RDDs of InternalRow which generates the input rows.
*
* @note Right now we support up to two RDDs
*/
def inputRDDs(): Seq[RDD[InternalRow]]

Which is a part of the CodeGenSupport trait. To look at it you would have no idea at all that it ever could actually hold ColumnarBatches instead of InternalRows. This hack is really just a way so that a leaf node in the plan which pulls in ColumnarBatchScan can delegate to code generation the conversion between batches and rows. I'm not sure how that in particular can be generalized into something that would allow for columnar input to arbitrary operators.

As such are you suggesting then that we keep the columnar execution path for SparkPlan like in this patch, but not the Expressions? Are you suggesting instead that we don't change SparkPlan and have it so that each operator can output an RDD[ColumnarBatch], but type erase it to an RDD[InternalRow]? We would still need some kind of flag or something else to indicate that this is happening in the child and a Rule of some kind to make sure that the transitions were inserted at the appropriate places.

@rxin
Copy link
Contributor

rxin commented Jun 7, 2019

I'm suggesting breaking this into two changes: first change is columnar interface (this physical plan perhaps takes columns and generates columns); second change is create a converter operator.

You shouldn't need the optimizer changes or the reference counting changes, or the expression changes...

@revans2
Copy link
Contributor Author

revans2 commented Jun 7, 2019

Yes, the reference counting and the expression changes could technically be removed. The reference counting is there to be able to support columnar expressions.

In reality I believe that none of this patch is technically required to make columnar processing work as a plugin. The reason I picked this level of abstraction, and adding in new rules that modify the physical plan was to try and reduce the amount of redundant code between a plugin and Spark SQL itself to the point that it would be preferable to use the plugin API over forking Spark. Every other implementation of columnar processing on Spark has gone the route of forking because the existing extension APIs, thought technically sufficient, require any plugin to re-implement large parts of query planning and execution.

For example, user defined rules for mapping a logical to a physical plan run prior to the built in rules. This means that if I add in a rule to replace a projection with a columnar enabled version the built in rule that creates the FileScanExec and does predicate push down into Parquet or Orc will not match the projection I just mapped. Predicate push down is then silently disabled. We ran into this issue when we tried to use the existing extension APIs.

Another example is ShuffleExchangeExec. The best performance optimization is to not do something, so if we can keep the data columnar through an exchange we have eliminated one columnar to row translation and one row to columnar translation. This is a part of the requirements in the SPIP. Replacing that in the logical to physical plan translation is possible, although difficult. But it also has the unintended consequence of the ReuseExchange rule not matching our alternative ShuffleExchangeExec and without full access to the physical plan I don't think we could re-implement that in the Logical to physical plan translation.

I picked a very specific place in the life cycle of the plan to inject these rules because it was as close to the final physical plan as possible so there were the fewest number of transformation afterwards. Even then the code generation rule afterwards still required some modifications.

I think dropping the changes to the Expressions is doable, but not having a way to modify the physical plan very late in the process would require us to really rethink the direction we are taking.

@SparkQA
Copy link

SparkQA commented Jun 7, 2019

Test build #106279 has finished for PR 24795 at commit a388060.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@revans2
Copy link
Contributor Author

revans2 commented Jun 10, 2019

@rxin

I removed the columnar changes to Expression and also reference counting from ColumnVector. Please take another look.

@SparkQA
Copy link

SparkQA commented Jun 10, 2019

Test build #106357 has finished for PR 24795 at commit ca6bb76.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good, thanks! I'm still wondering if the conf org.apache.spark.example.columnar.enabled or similar is meant to be per plugin or is there going to be a global conf to turn off all columnar processing?

// Empty
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you use 1 newline here and below instead of 2?

@revans2
Copy link
Contributor Author

revans2 commented Jun 18, 2019

@BryanCutler Sorry about missing that question.

Yes, the assumption is that each plugin could/would add their own configs. We don't want to globally disable columnar support because we will eventually use it in regular operation, like with batch data being read from parquet or the transformations for pandas or R.

@SparkQA
Copy link

SparkQA commented Jun 18, 2019

Test build #106636 has finished for PR 24795 at commit dc53e83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (cb != null) {
cb.close
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: cb.close()?

import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal}
import org.apache.spark.sql.catalyst.expressions.{Add, Alias, AttributeReference, AttributeSeq, BoundReference, Expression, ExpressionInfo, ExprId, Literal, NamedExpression}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about import org.apache.spark.sql.catalyst.expressions._. It looks too long

import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
import org.apache.spark.sql.execution.{ColumnarRule, ColumnarToRowExec, ProjectExec, RowToColumnarExec, SparkPlan, SparkStrategy}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@revans2
Copy link
Contributor Author

revans2 commented Jun 20, 2019

@BryanCutler @kiszk @rxin

I think I have addressed all of the review comments so far. I am happy to do more if there are more issues, but I really would like to keep this moving and try to get some of the follow on work up as well. Please let me know what else is needed to get this merged in.

val df = data.selectExpr("vals + 1")
// Verify that both pre and post processing of the plan worked.
val found = df.queryExecution.executedPlan.collect {
case rep: ReplacedRowToColumnarExec => 1
Copy link
Member

@kiszk kiszk Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the following? The sum will return the unique value that corresponds to the combination.

case rep: ... => 1
case proj: ... => 4
case c2r: ... => 13

@SparkQA
Copy link

SparkQA commented Jun 20, 2019

Test build #106732 has finished for PR 24795 at commit 8443b29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as a first step, what will be the next task in the epic?

import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* Holds a user defined rule that can be used to inject columnar implementations of various
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand - why do we need ColumnarRule, i.e. any rule that's column specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin
As I explained before there needs to be at least one rule that operates on the entire physical SparkPlan after the exchanges have been inserted into the plan.

Having two rules, one that runs prior to inserting columnar transitions and one that runs after it makes it so a plugin does not have to duplicate code for inserting columnar transitions and provides a cleaner API for anyone trying to insert columnar processing, but that is relatively minor.

If you want me to split ColumnarRule up and rename the individual parts to not be columnar specific I am happy to do it. If you want me to make it a single rule that runs prior to code generation I can make that work too. Just let me know which of the two changes you would prefer, and I will make it happen.

@revans2
Copy link
Contributor Author

revans2 commented Jun 21, 2019

@BryanCutler the next step is to remove ColumnarBatchScan and replace it with the injected columnar transitions. I have a patch for that ready to go as soon as this patch goes in.

After that, I would look at how to update the code that uses arrow vectors to communicate with Python/R/.net, etc and how best to update them to commonize as much of the code as possible.

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106769 has finished for PR 24795 at commit 7f1753d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'll wait a bit longer to commit in case anyone else wants to review.

@@ -604,7 +604,10 @@ public final int appendArray(int length) {
*/
public final int appendStruct(boolean isNull) {
if (isNull) {
appendNull();
// This is the same as appendNull but without the assertion for struct types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because appendNull itself has an assertion that you are not appending a struct. So any call to appendStruct with isNull true would have failed.

* Returns the result of evaluating this expression on the entire
* [[org.apache.spark.sql.vectorized.ColumnarBatch]]. The result of
* calling this may be a single [[org.apache.spark.sql.vectorized.ColumnVector]] or a scalar
* value. Scalar values typically happen if they are a part of the expression i.e. col("a") + 100.
Copy link
Contributor

@cloud-fan cloud-fan Jul 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this is in test not an API, but other people may look at this test to learn how to implement columnar operator, and I feel the current example is not that good.

IIUC, the goal is:

  1. users can write a rule to replace an arbitrary SQL operator with a custom optimized columnar version
  2. Spark automatically insert column-to-row and row-to-column operators around the columnar operator.

For 1, I think a pretty simple approach is, take in an expression tree, compile it to a columnar processor that can execute the expression tree in a columnar fashion. We don't need to create a ColumnarExpression, which seems over complicated to me as a column processor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason it is this way is that originally I had Expression support columnar as well but as a part of the review it changed so I made minimal changes to the example to match.
I do find it simpler from a development standpoint to have a one to one mapping. I can then write unit tests with just inputs and verify that the outputs match exactly. But yes there probably are simpler ways to do this depending on the columnar library you are using.

@cloud-fan
Copy link
Contributor

Generally looks good, but I'm wondering if it's better to use Apache Arrow as the exchange format instead of the internal ColumnarBatch. I understand that this is a developer API so stability is not a concern here. I'm just afraid people may find ColumnarBatch hard to use when doing high-perf columnar operations, but they can't change it as it's in Spark.

@tgravescs
Copy link
Contributor

If you look at the discuss and vote threads there was discussion on this and people were worried about it since it wasn't 1.0 and could potentially change, so we wanted to separate that out and not do at this point

Tonix517 pushed a commit to Tonix517/spark that referenced this pull request Jul 3, 2019
## What changes were proposed in this pull request?

This is the first part of [SPARK-27396](https://issues.apache.org/jira/browse/SPARK-27396).  This is the minimum set of changes necessary to support a pluggable back end for columnar processing.  Follow on JIRAs would cover removing some of the duplication between functionality in this patch and functionality currently covered by things like ColumnarBatchScan.

## How was this patch tested?

I added in a new unit test to cover new code not really covered in other places.

I also did manual testing by implementing two plugins/extensions that take advantage of the new APIs to allow for columnar processing for some simple queries.  One version runs on the [CPU](https://gist.github.com/revans2/c3cad77075c4fa5d9d271308ee2f1b1d).  The other version run on a GPU, but because it has unreleased dependencies I will not include a link to it yet.

The CPU version I would expect to add in as an example with other documentation in a follow on JIRA

This is contributed on behalf of NVIDIA Corporation.

Closes apache#24795 from revans2/columnar-basic.

Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
@@ -251,6 +286,371 @@ object MyExtensions {
(_: Seq[Expression]) => Literal(5, IntegerType))
}

case class CloseableColumnBatchIterator(itr: Iterator[ColumnarBatch],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but for my own understanding, why is it case class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have to be a case class, just made it so I didn't need to add in the new, but I am happy to change it if you want me to in a follow on PR.

private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean =
// We are walking up the plan, so columnar starts when we transition to rows
// and ends when we transition to columns
plan match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a biggie but indentation here looks weird.

@HyukjinKwon
Copy link
Member

Looks good to me in general too.

vinodkc pushed a commit to vinodkc/spark that referenced this pull request Jul 18, 2019
…umnar

## What changes were proposed in this pull request?

This is a second part of the https://issues.apache.org/jira/browse/SPARK-27396 and a follow on to apache#24795

## How was this patch tested?

I did some manual tests and ran/updated the automated tests

I did some simple performance tests on a single node to try to verify that there is no performance impact, and I was not able to measure anything beyond noise.

Closes apache#25008 from revans2/columnar-remove-batch-scan.

Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet