Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37933][SQL] Limit push down for parquet vectorized reader #35256

Closed
wants to merge 3 commits into from

Conversation

jackylee-ch
Copy link
Contributor

Why are the changes needed?

Based on 34291, we can support limit push down to parquet datasource v2 reader, which can stop scanning parquet early, and reduce network and disk IO.
Currently, only vectorized reader is supported in this pr. Row based reader with limit pushdown needs to be supported in parquet-hadoop first, thus it will be supported in the next pr.

Limit parse status for parquet
before

== Physical Plan ==
CollectLimit 10
+- *(1) ColumnarToRow
   +- BatchScan[a#0, b#1] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/datasources.db/test_push_down/par..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<a:int,b:int>, PushedFilters: [], PushedAggregation: [], PushedGroupBy: [] RuntimeFilters: [] 

After

== Physical Plan ==
CollectLimit 10
+- *(1) ColumnarToRow
   +- BatchScan[a#0, b#1] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/datasources.db/test_push_down/par..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<a:int,b:int>, PushedFilters: [], PushedAggregation: [], PushedGroupBy: [], PushedLimit: Some(10) RuntimeFilters: [] 

Does this PR introduce any user-facing change?

No

How was this patch tested?

origin tests and new tests

@github-actions github-actions bot added the SQL label Jan 20, 2022
@jackylee-ch jackylee-ch changed the title [SPARK-37831][SQL] Limit push down for parquet vectorized reader [SPARK-37933][SQL] Limit push down for parquet vectorized reader Jan 20, 2022
@jackylee-ch
Copy link
Contributor Author

cc @huaxingao @cloud-fan

@jackylee-ch
Copy link
Contributor Author

also cc @sunchao

}

public VectorizedParquetRecordReader(
ZoneId convertTz,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4-space indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

checkEndOfRowGroup();

int num = (int) Math.min((long) capacity, totalCountLoadedSoFar - rowsReturned);
int num = (int) Math.min((long) capacity,
Math.min((long) limit - rowsReturned, totalCountLoadedSoFar - rowsReturned));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

val df = spark.read.parquet(path.getPath).limit(pushedLimit)
val sparkPlan = df.queryExecution.sparkPlan
sparkPlan foreachUp {
case r@ BatchScanExec(_, f: ParquetScan, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between r and @

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@huaxingao
Copy link
Contributor

@stczwd Thanks for working on this! The changes look reasonable to me. I left a couple of nit comments for coding style.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stczwd .

I'm curious whether this will be very useful. To my knowledge, Spark already pushes local limit to each task, see LimitExec.doExecute:

  protected override def doExecute(): RDD[InternalRow] = {
    val childRDD = child.execute()
    if (childRDD.getNumPartitions == 0) {
      new ParallelCollectionRDD(sparkContext, Seq.empty[InternalRow], 1, Map.empty)
    } else {
      val singlePartitionRDD = if (childRDD.getNumPartitions == 1) {
        childRDD
      } else {
        val locallyLimited = childRDD.mapPartitionsInternal(_.take(limit))
        new ShuffledRowRDD(
          ShuffleExchangeExec.prepareShuffleDependency(
            locallyLimited,
            child.output,
            SinglePartition,
            serializer,
            writeMetrics),
          readMetrics)
      }
      singlePartitionRDD.mapPartitionsInternal(_.take(limit))
    }
  }

Since the vectorized Parquet reader behaves like an iterator of ColumnarBatches, it will stop reading more batches when the local limit is reached.

Also have you done any benchmark with this feature?

this.convertTz = convertTz;
this.datetimeRebaseMode = datetimeRebaseMode;
this.datetimeRebaseTz = datetimeRebaseTz;
this.int96RebaseMode = int96RebaseMode;
this.int96RebaseTz = int96RebaseTz;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
this.limit = Integer.MAX_VALUE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use:

    this(convertTz, datetimeRebaseMode, datetimeRebaseTz, int96RebaseMode, int96RebaseTz,
      useOffHeap, capacity, Integer.MAX_VALUE);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@jackylee-ch
Copy link
Contributor Author

Thanks for your reply, @sunchao.
Limit pushdown can terminate the capacity batch reading in advance, it is used especially when reading across rowgroups, which can reduce the reading of one rowgroup.
I have test limitBenchMark in ParquetNestedSchemaPruningBenchmark. Unfortunately it didn't show clear advantage.

@c21
Copy link
Contributor

c21 commented Jan 21, 2022

I have same concern with @sunchao. IMO the improvement might not be significant as we already have limit to not read extra batch during execution. On the other hand, the DSv2 ORC vectorized reader does not have control of skipping row group, as Parquet reader, so the limit push down cannot be implemented for ORC reader in the future. And here we have to introduce code complexity in the common interface FileScanBuilder. I feel this is an overkill unless we can showcase some obvious improvements on current benchmark.

@jackylee-ch
Copy link
Contributor Author

Thanks for your reply @c21.
Sorry, my last test didn't adjust the limit amount, so the limit didn't really work.
This time I also test limitBenchMark in ParquetNestedSchemaPruningBenchmarkset, but set the batch capacity to 10240 and the limit to 12500. Now we found that the performance improved by 1.3x.

OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.11.0-1025-azure
Intel(R) Xeon(R) CPU E5-2673 v4 @ 2.30GHz
Limiting:                                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column with out limit                      83            101          21         12.1          82.7       1.0X
Nested column with out limit                         77             95          11         12.9          77.4       1.1X
Nested column in array with out limit               105            121          19          9.5         105.1       0.8X
Top-level column with limit                          61             69           5         16.3          61.3       1.3X
Nested column with limit                             66             73           7         15.2          65.8       1.3X
Nested column in array with limit                   101            113          20          9.9         101.2       0.8X

@jackylee-ch
Copy link
Contributor Author

jackylee-ch commented Jan 24, 2022

In essence, the improvement of this pr is to reduce the amount of data obtained for the last time, especially in the following scenarios.

  1. the capacity is set relatively large, but with limit we only need to read a small batch.
  2. the last batch need to query 2 rowgroups, but with limit we only need to read only one rowgroup.

Based on the above performance improvement, personally think that it is still necessary, but it still depends on your opinions, thank you.
@c21 @sunchao @cloud-fan @huaxingao

@LuciferYang
Copy link
Contributor

Can you resolve the conflict first @stczwd

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 13, 2022
@github-actions github-actions bot closed this Jun 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants