Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23442][SQL] Increase reading tasks when reading from partitioned and bucketed table. #21460

Closed
wants to merge 1 commit into from

Conversation

wangyum
Copy link
Member

@wangyum wangyum commented May 30, 2018

What changes were proposed in this pull request?

For a partitioned and bucketed table. With the increasing number of partitions, the amount of data is getting larger and larger. Reading this table always uses the bucket number of tasks.
This PR changes the logic to bucket number * partition number when reading partitioned and bucketed table.

How was this patch tested?

manual tests.

spark.range(10000).selectExpr(
  "id as key",
  "id % 5 as t1",
  "id % 10 as p").repartition(5, col("p")).write.partitionBy("p").bucketBy(5,
  "key").sortBy("t1").saveAsTable("spark_23442")
// All partition: partition number = 5 * 10 = 50
spark.sql("select count(distinct t1) from spark_23442 ").show
// Filtered 1/2 partition: partition number = 5 * (10 / 2) = 25
spark.sql("select count(distinct t1) from spark_23442 where p >= 5 ").show

@SparkQA
Copy link

SparkQA commented May 30, 2018

Test build #91291 has finished for PR 21460 at commit 58e4e09.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

(I'd avoid saying just an improvement in something in the PR title btw)

@HyukjinKwon
Copy link
Member

retest this please

@wangyum wangyum changed the title [SPARK-23442][SQL] Improvement reading from partitioned and bucketed table. [SPARK-23442][SQL] Increase reading tasks when reading from partitioned and bucketed table. May 30, 2018
@pnpranavrao
Copy link

The reason createBucketedReadRDD works the way currently (accumulate all buckets across partitions into a partition with id equal to bucketId) is to skip shuffle when joining on the bucketed columns. Your suggested change would break this.

I opened this JIRA for the same need. We need to generate different physical plans for join and non-join usecases, and not assume if a datasource is bucketed, it will be used for a join involving the bucketed columns like now.

The workaround right now is to turn off bucketing with a SparkConf flag, but I'm working on using the both partitioning and bucketing information to plan queries.

@SparkQA
Copy link

SparkQA commented May 30, 2018

Test build #91293 has finished for PR 21460 at commit 58e4e09.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum closed this Jul 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants