Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13255][SQL] Update vectorized reader to directly return ColumnarBatch instead of InternalRows. #11435

Closed
wants to merge 10 commits into from

Conversation

nongli
Copy link
Contributor

@nongli nongli commented Feb 29, 2016

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

Currently, the parquet reader returns rows one by one which is bad for performance. This patch
updates the reader to directly return ColumnarBatches. This is only enabled with whole stage
codegen, which is the only operator currently that is able to consume ColumnarBatches (instead
of rows). The current implementation is a bit of a hack to get this to work and we should do
more refactoring of these low level interfaces to make this work better.

How was this patch tested?

Results:
TPCDS:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
---------------------------------------------------------------------------------
q55 (before)                             8897 / 9265         12.9          77.2
q55                                      5486 / 5753         21.0          47.6

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #52198 has finished for PR 11435 at commit c1b6ab1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<Object>

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #52199 has finished for PR 11435 at commit 490189c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #2592 has finished for PR 11435 at commit 490189c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52344 has finished for PR 11435 at commit c5a041e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor Author

nongli commented Mar 3, 2016

This patch is abusing the existing abstractions which will be/is being cleaned up in other patches. For a benchmark of the partition reading speed, I tested this on a table with 2 int cols, one in the data, one as the partition column. The results are:

Partitioned Table:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
Read data column                          217 /  262         72.3          13.8       1.0X
Read partition column                     104 /  150        150.6           6.6       2.1X
Read both columns                         240 /  266         65.5          15.3       0.9X

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52345 has finished for PR 11435 at commit ce4099c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52350 has finished for PR 11435 at commit 2908d01.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52351 has finished for PR 11435 at commit 670dee7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else if (t == DataTypes.StringType) {
UTF8String v = row.getUTF8String(fieldIdx);
for (int i = 0; i < capacity; i++) {
col.putByteArray(i, v.getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If row is UnsafeRow, v.getBytes() will copy the bytes, we should pull that out of the loop.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #2607 has finished for PR 11435 at commit 670dee7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #52410 has finished for PR 11435 at commit 2a16ec0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 3, 2016

Test build #2611 has finished for PR 11435 at commit 2a16ec0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #2612 has finished for PR 11435 at commit 2a16ec0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…arBatch instead of InternalRows.

Currently, the parquet reader returns rows one by one which is bad for performance. This patch
updates the reader to directly return ColumnarBatches. This is only enabled with whole stage
codegen, which is the only operator currently that is able to consume ColumnarBatches (instead
of rows). The current implementation is a bit of a hack to get this to work and we should do
more refactoring of these low level interfaces to make this work better.

Results:
TPCDS:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
---------------------------------------------------------------------------------
q55 (before)                             8897 / 9265         12.9          77.2
q55                                      5486 / 5753         21.0          47.6
@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52446 has finished for PR 11435 at commit f5f1e2b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52450 has finished for PR 11435 at commit ed79eee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

| ${consume(ctx, columns).trim}
| if (shouldStop()) {
| return;
| if ($batch != null || $input.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this:

if ($batch != null) {
  $scanBatches();
} else if ($input.hasNext()) {
  Object $value = $input.next();
  if ($value instanceof $columnarBatchClz) {
       $batch = ($columnarBatchClz)$value;
       $scanBatches();
  } else {
       $scanRows((InternalRow) $value);
  }
}

@SparkQA
Copy link

SparkQA commented Mar 4, 2016

Test build #52479 has finished for PR 11435 at commit 48102e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Mar 4, 2016

LGTM, merging this into master, thanks!

@asfgit asfgit closed this in a6e2bd3 Mar 4, 2016
@nongli nongli deleted the spark-13255 branch March 4, 2016 23:23
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…narBatch instead of InternalRows.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

Currently, the parquet reader returns rows one by one which is bad for performance. This patch
updates the reader to directly return ColumnarBatches. This is only enabled with whole stage
codegen, which is the only operator currently that is able to consume ColumnarBatches (instead
of rows). The current implementation is a bit of a hack to get this to work and we should do
more refactoring of these low level interfaces to make this work better.

## How was this patch tested?

```
Results:
TPCDS:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
---------------------------------------------------------------------------------
q55 (before)                             8897 / 9265         12.9          77.2
q55                                      5486 / 5753         21.0          47.6
```

Author: Nong Li <nong@databricks.com>

Closes apache#11435 from nongli/spark-13255.
cloud-fan pushed a commit that referenced this pull request Jan 25, 2022
…ntervalType correctly

### What changes were proposed in this pull request?

[`ColumnVectorUtils.populate()` does not handle CalendarInterval type correctly](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java#L93-L94). The CalendarInterval type is in the format of [(months: int, days: int, microseconds: long)](https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java#L58 ). However, the function above misses `days` field, and sets `microseconds` field in wrong position.

`ColumnVectorUtils.populate()` is used by [Parquet](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L258) and [ORC](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java#L171) vectorized reader to read partition column. So technically Spark can potentially produce wrong result if reading table with CalendarInterval partition column. However I also notice Spark [explicitly disallows writing data with CalendarInterval type](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L586 ), so it might not be a big deal for users. But it's worth to fix anyway.

Caveat: I found the bug when reading through the related code path, but I never encountered the issue in production for partition column with CalendarInterval type. I think it should be an obvious fix unless anyone more experienced could find some more historical context. The code was introduced a long time ago where I couldn't find any more info why it was implemented as it is (#11435)

### Why are the changes needed?

To fix potential correctness issue.

### Does this PR introduce _any_ user-facing change?

No but fix the exiting correctness issue when reading partition column with CalendarInterval type.

### How was this patch tested?

Added unit test in `ColumnVectorSuite.scala`.
Verified the unit test failed with exception below without this PR:

```
java.lang.NullPointerException was thrown.
java.lang.NullPointerException
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLongs(OnHeapColumnVector.java:345)
	at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:94)
	at org.apache.spark.sql.execution.vectorized.ColumnVectorSuite.$anonfun$new$99(ColumnVectorSuite.scala:613)
```

Closes #35314 from c21/vector-fix.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants