Skip to content

Commit

Permalink
[SPARK-36123][SQL] Parquet vectorized reader doesn't skip null values…
Browse files Browse the repository at this point in the history
… correctly

### What changes were proposed in this pull request?

Fix the skipping values logic in Parquet vectorized reader when column index is effective, by considering nulls and only call `ParquetVectorUpdater.skipValues` when the values are non-null.

### Why are the changes needed?

Currently, the Parquet vectorized reader may not work correctly if column index filtering is effective, and the data page contains null values. For instance, let's say we have two columns `c1: BIGINT` and `c2: STRING`, and the following pages:
```
   * c1        500       500       500       500
   *  |---------|---------|---------|---------|
   *  |-------|-----|-----|---|---|---|---|---|
   * c2     400   300   300 200 200 200 200 200
```

and suppose we have a query like the following:
```sql
SELECT * FROM t WHERE c1 = 500
```

this will create a Parquet row range `[500, 1000)` which, when applied to `c2`, will require us to skip all the rows in `[400,500)`. However the current logic for skipping rows is via `updater.skipValues(n, valueReader)` which is incorrect since this skips the next `n` non-null values. In the case when nulls are present, this will not work correctly.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test in `ParquetColumnIndexSuite`.

Closes #33330 from sunchao/SPARK-36123-skip-nulls.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
sunchao authored and cloud-fan committed Jul 14, 2021
1 parent b866457 commit e980c7a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 20 deletions.
Expand Up @@ -203,8 +203,7 @@ private void readBatchInternal(
long rangeEnd = state.currentRangeEnd();

if (rowId + n < rangeStart) {
updater.skipValues(n, valueReader);
advance(n);
skipValues(n, state, valueReader, updater);
rowId += n;
leftInPage -= n;
} else if (rowId > rangeEnd) {
Expand All @@ -217,8 +216,7 @@ private void readBatchInternal(
// skip the part [rowId, start)
int toSkip = (int) (start - rowId);
if (toSkip > 0) {
updater.skipValues(toSkip, valueReader);
advance(toSkip);
skipValues(toSkip, state, valueReader, updater);
rowId += toSkip;
leftInPage -= toSkip;
}
Expand Down Expand Up @@ -255,6 +253,39 @@ private void readBatchInternal(
state.advanceOffsetAndRowId(offset, rowId);
}

/**
* Skip the next `n` values (either null or non-null) from this definition level reader and
* `valueReader`.
*/
private void skipValues(
int n,
ParquetReadState state,
VectorizedValuesReader valuesReader,
ParquetVectorUpdater updater) {
while (n > 0) {
if (this.currentCount == 0) this.readNextGroup();
int num = Math.min(n, this.currentCount);
switch (mode) {
case RLE:
// we only need to skip non-null values from `valuesReader` since nulls are represented
// via definition levels which are skipped here via decrementing `currentCount`.
if (currentValue == state.maxDefinitionLevel) {
updater.skipValues(num, valuesReader);
}
break;
case PACKED:
for (int i = 0; i < num; ++i) {
// same as above, only skip non-null values from `valuesReader`
if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) {
updater.skipValues(1, valuesReader);
}
}
break;
}
currentCount -= num;
n -= num;
}
}

// The RLE reader implements the vectorized decoding interface when used to decode dictionary
// IDs. This is different than the above APIs that decodes definitions levels along with values.
Expand Down Expand Up @@ -358,7 +389,14 @@ public void skipIntegers(int total) {
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
advance(n);
switch (mode) {
case RLE:
break;
case PACKED:
currentBufferIdx += n;
break;
}
currentCount -= n;
left -= n;
}
}
Expand Down Expand Up @@ -403,20 +441,6 @@ public void skipFixedLenByteArray(int total, int len) {
throw new UnsupportedOperationException("only skipIntegers is valid");
}

/**
* Advance and skip the next `n` values in the current block. `n` MUST be <= `currentCount`.
*/
private void advance(int n) {
switch (mode) {
case RLE:
break;
case PACKED:
currentBufferIdx += n;
break;
}
currentCount -= n;
}

/**
* Reads the next varint encoded int.
*/
Expand Down
Expand Up @@ -38,7 +38,7 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
* col_1 500 500 500 500
* |---------|---------|---------|---------|
* |-------|-----|-----|---|---|---|---|---|
* col_2 400 300 200 200 200 200 200 200
* col_2 400 300 300 200 200 200 200 200
*/
def checkUnalignedPages(df: DataFrame)(actions: (DataFrame => DataFrame)*): Unit = {
Seq(true, false).foreach { enableDictionary =>
Expand Down Expand Up @@ -92,4 +92,14 @@ class ParquetColumnIndexSuite extends QueryTest with ParquetTest with SharedSpar
)
checkUnalignedPages(df)(actions: _*)
}

test("SPARK-36123: reading from unaligned pages - test filters with nulls") {
// insert 50 null values in [400, 450) to verify that they are skipped during processing row
// range [500, 1000) against the second page of col_2 [400, 800)
var df = spark.range(0, 2000).map { i =>
val strVal = if (i >= 400 && i < 450) null else i + ":" + "o" * (i / 100).toInt
(i, strVal)
}.toDF()
checkUnalignedPages(df)(actions: _*)
}
}

0 comments on commit e980c7a

Please sign in to comment.