Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32709][SQL] Support write Hive ORC/Parquet bucketed table (for Hive 1,2) #30003

Closed
wants to merge 8 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Oct 10, 2020

What changes were proposed in this pull request?

Hive ORC/Parquet write code path is same as data source v1 code path (FileFormatWriter). This PR is to add the support to write Hive ORC/Parquet bucketed table with hivehash. The change is to custom bucketIdExpression to use HiveHash when the table is Hive bucketed table, and the Hive version is 1.x.y or 2.x.y. Support for Hive 3 will be added later in other PR after Hive murmur3hash being added in spark.

The changes are mostly on:

  • HiveMetastoreCatalog.scala: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into HadoopFsRelation and LogicalRelation, which can be later accessed by InsertIntoHadoopFsRelationCommand and FileFormatWriter.

  • FileFormatWriter.scala: Use HiveHash for bucketIdExpression if it's writing to hive bucketed table. In addition, spark output file name should follow Hive (and Presto) bucketed file naming convention. Introduce another parameter bucketFileNamePrefix and it introduces subsequent change in FileCommitProtocol and HadoopMapReduceCommitProtocol.

  • DataSourceScanExec.scala: Add an extra check for bucketedScan that makes sure not enable bucketing when reading hive bucketed table as we propagate bucket spec from every hive relation (read and write) in HiveMetastoreCatalog.scala.

Why are the changes needed?

To make spark write other-SQL-engines-compatible bucketed table. Currently spark bucketed table cannot be leveraged by other SQL engines like hive and presto, because it uses a different hash function (spark murmur3hash). With this PR, the spark-written-hive-bucketed-table can be efficiently read by presto and hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from facebook, uber, etc) migrate bucketing workload from hive to spark.

Does this PR introduce any user-facing change?

Yes, any hive bucketed table written by spark with hive 1/2, is properly bucketed and can be efficiently processed by presto and hive.

How was this patch tested?

  1. Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.

  2. Cross engines test (take prestosql as example here): set up presto server and hive metastore locally in laptop, and run presto and spark queries locally in laptop.

Created a hive bucketed table by using presto:

CREATE TABLE hive.di.chengsu_table (
  key int,
  value varchar,
  part varchar
)
WITH (
  format = 'PARQUET',
  partitioned_by = ARRAY['part'],
  bucketed_by = ARRAY['key'],
  bucket_count = 8
)

Write hive bucketed table (part='part0') by using spark, and read the table by using presto. Verify presto bucket pruning work:

presto:default> SELECT * FROM hive.di.chengsu_table WHERE part='part0' AND "$bucket" in (2);
 key | value | part  
-----+-------+-------
   2 | 2     | part0 
  10 | 10    | part0 
  18 | 18    | part0 
(3 rows)

Underlying files in the partition directory:

chengsu@chengsu-mbp part=part0 % ls
00002_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00002.c000.snappy.parquet
00005_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00005.c000.snappy.parquet

Write the hive bucketed table by using presto (NOTE: after this, one partition contains both presto and spark written files):

presto:default> INSERT INTO hive.di.chengsu_table
             -> VALUES
             ->   (0, '0', 'part0'),
             ->   (1, '1', 'part0'),
             ->   (2, '2', 'part0'),
             ->   (1, '1', 'part1'),
             ->   (2, '2', 'part1'),
             ->   (3, '3', 'part1');

Use presto read the partition again, verify the bucket pruning work on mixed data written by spark and presto:

presto:default> SELECT * FROM hive.di.chengsu_table WHERE part='part0' AND "$bucket" in (2);
 key | value | part  
-----+-------+-------
   2 | 2     | part0 
  10 | 10    | part0 
  18 | 18    | part0 
   2 | 2     | part0 

Underlying files in partition directory:

chengsu@chengsu-mbp part=part0 % ls
000000_0_20201010_200332_00028_tct3t
000001_0_20201010_200332_00028_tct3t
000002_0_20201010_200332_00028_tct3t
00002_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00002.c000.snappy.parquet
00005_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00005.c000.snappy.parquet

In addition, verify join spark-written-bucketed-table and presto-written-bucketed table and the result looks correct:

presto:default> SELECT t1.key, t1.value, t2.key, t2.value
             -> FROM hive.di.chengsu_table t1
             -> JOIN hive.di.chengsu_table t2
             -> ON t1.value = t2.value
             -> AND t1.part = 'part0'
             -> AND t2.part = 'part1';
 key | value | key | value 
-----+-------+-----+-------
   2 | 2     |   2 | 2     
   2 | 2     |   2 | 2     
   1 | 1     |   1 | 1     
   2 | 2     |   2 | 2     
   2 | 2     |   2 | 2    

Aggregate on spark-written-bucketed-table and the result looks correct:

presto:default> SELECT key, COUNT(*)
             -> FROM hive.di.chengsu_table
             -> WHERE part = 'part0'
             -> GROUP BY key;
 key | _col1 
-----+-------
   0 |     1 
   5 |     1 
  13 |     1 
   1 |     1 
   2 |     2 
  10 |     1 
  18 |     1 

@c21
Copy link
Contributor Author

c21 commented Oct 10, 2020

cc @cloud-fan , @maropu , @viirya , @sameeragarwal and @CodingCat if you guys have time to take a look, thanks.

@SparkQA
Copy link

SparkQA commented Oct 10, 2020

Test build #129630 has finished for PR 30003 at commit 87481ec.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 10, 2020

Test build #129631 has finished for PR 30003 at commit 4919b00.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 10, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34234/

@maropu
Copy link
Member

maropu commented Oct 11, 2020

Thanks for the work, @c21 ! btw, we need to care about hive v1? Recently, we've removed the hive-1.2 related code though.

@c21
Copy link
Contributor Author

c21 commented Oct 11, 2020

@maropu - this should work with hive 1.x.y and 2.x.y versions, and we still claim to support from hive 0.12.0 to 2.3.7. Btw presto (prestodb and prestosql) are supporting hivehash version bucketed table, so I think it should be good to support here in spark. In practice, hivehash bucketed table is still used in most companies deploy presto.

@c21
Copy link
Contributor Author

c21 commented Oct 11, 2020

btw #29961 seems to remove some legacy workaround code related to hive-1.2 in spark repo, but it does not mean we are not supporting hive-1.2 any more, right? I haven't looked closely into the PR but just put first feeling here.

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34234/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34235/

@maropu
Copy link
Member

maropu commented Oct 11, 2020

Yea, if we could support both without workaround code, I think it's okay. cc: @dongjoon-hyun @HyukjinKwon @wangyum

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34236/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34235/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34236/

@SparkQA
Copy link

SparkQA commented Oct 11, 2020

Test build #129632 has finished for PR 30003 at commit d5738fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 12, 2020

Yeah, I think we still support.

  • Execution side: Hive 1.2 (that was used during execution) was removed. Now Hive 2.3 is used here. <- this is what we did.
  • Client side: for metastore, we still support other Hive versions by providing the metastore jars of different Hive version.

@@ -230,9 +236,9 @@ class DynamicPartitionDataWriter(
description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir))
}
val currentPath = if (customPath.isDefined) {
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, prefix, ext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the prefix parameter because we want to put the bucket id at the beginning of the file name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yes, this is the cleanest way I can think of to achieve that (compatible file naming), let me know if you have any other idea, thanks.

@c21
Copy link
Contributor Author

c21 commented Oct 15, 2020

@cloud-fan - wondering how do you think of current approach? do you have any other more comments? Thanks.

@c21
Copy link
Contributor Author

c21 commented Nov 19, 2020

@cloud-fan - here I changed to the approach to create one FileCommitProtocolV2 as we discussed. Want to check with you if this makes sense. I need to add some more tests for dynamic partitioned bucketed table query, but I want to get some feedback whether we are good to go down this road, before crafting more unit tests, thanks.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131334 has finished for PR 30003 at commit 9300055.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131338 has finished for PR 30003 at commit 89afa0c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131340 has finished for PR 30003 at commit fbb0798.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35938/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35938/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35943/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35944/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35943/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35946/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35944/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35946/

@SparkQA
Copy link

SparkQA commented Nov 19, 2020

Test build #131342 has finished for PR 30003 at commit 9fc6a8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 28, 2021
@github-actions github-actions bot closed this Mar 1, 2021
dongjoon-hyun pushed a commit that referenced this pull request Jun 25, 2021
…xible file naming

### What changes were proposed in this pull request?

This PR is to introduce a new sets of APIs `newTaskTempFile` and `newTaskTempFileAbsPath` inside `FileCommitProtocol`, to allow more flexible file naming of Spark output. The major change is to pass `FileNameSpec` into `FileCommitProtocol`, instead of original `ext` (currently having `prefix` and `ext`), to allow individual `FileCommitProtocol` implementation comes up with more flexible file names (e.g. has a custom `prefix`) for Hive/Presto bucketing - #30003. Provide a default implementations of the added APIs, so all existing implementation of `FileCommitProtocol` is NOT being broken.

### Why are the changes needed?

To make commit protocol more flexible in terms of Spark output file name.
Pre-requisite of #30003.

### Does this PR introduce _any_ user-facing change?

Yes for developers  who implement/run custom implementation of `FileCommitProtocol`. They can choose to implement for the newly added API.

### How was this patch tested?

Existing unit tests as this is just adding an API.

Closes #33012 from c21/commit-protocol-api.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Sep 17, 2021
…ormat with Hive hash)

### What changes were proposed in this pull request?

This is a re-work of #30003, here we add support for writing Hive bucketed table with Parquet/ORC file format (data source v1 write path and Hive hash as the hash function). Support for Hive's other file format will be added in follow up PR.

The changes are mostly on:

* `HiveMetastoreCatalog.scala`: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into `HadoopFsRelation` and `LogicalRelation`, which can be later accessed by `FileFormatWriter` to customize bucket id and file name.

* `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's writing to Hive bucketed table. In addition, Spark output file name should follow Hive/Presto/Trino bucketed file naming convention. Introduce another parameter `bucketFileNamePrefix` and it introduces subsequent change in `FileFormatDataWriter`.

* `HadoopMapReduceCommitProtocol`: Implement the new file name APIs introduced in #33012, and change its sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work with all commit protocol (including S3A commit protocol).

### Why are the changes needed?

To make Spark write other-SQL-engines-compatible bucketed table. Currently Spark bucketed table cannot be leveraged by other SQL engines like Hive and Presto, because it uses a different hash function (Spark murmur3hash) and different file name scheme. With this PR, the Spark-written-Hive-bucketed-table can be efficiently read by Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from Facebook, Lyft, etc) migrate bucketing workload from Hive to Spark.

### Does this PR introduce _any_ user-facing change?

Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is properly bucketed and can be efficiently processed by Hive and Presto/Trino.

### How was this patch tested?

* Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.
* Tested by Lyft Spark team (Shashank Pedamallu) to read Spark-written bucketed table from Trino, Spark and Hive.

Closes #33432 from c21/hive-bucket-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Oct 8, 2021
…ormat with Hive hash)

### What changes were proposed in this pull request?

This is a re-work of apache#30003, here we add support for writing Hive bucketed table with Parquet/ORC file format (data source v1 write path and Hive hash as the hash function). Support for Hive's other file format will be added in follow up PR.

The changes are mostly on:

* `HiveMetastoreCatalog.scala`: When converting hive table relation to data source relation, pass bucket info (BucketSpec) and other hive related info as options into `HadoopFsRelation` and `LogicalRelation`, which can be later accessed by `FileFormatWriter` to customize bucket id and file name.

* `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's writing to Hive bucketed table. In addition, Spark output file name should follow Hive/Presto/Trino bucketed file naming convention. Introduce another parameter `bucketFileNamePrefix` and it introduces subsequent change in `FileFormatDataWriter`.

* `HadoopMapReduceCommitProtocol`: Implement the new file name APIs introduced in apache#33012, and change its sub-class `PathOutputCommitProtocol`, to make Hive bucketed table writing work with all commit protocol (including S3A commit protocol).

### Why are the changes needed?

To make Spark write other-SQL-engines-compatible bucketed table. Currently Spark bucketed table cannot be leveraged by other SQL engines like Hive and Presto, because it uses a different hash function (Spark murmur3hash) and different file name scheme. With this PR, the Spark-written-Hive-bucketed-table can be efficiently read by Presto and Hive to do bucket filter pruning, join, group-by, etc. This was and is blocking several companies (confirmed from Facebook, Lyft, etc) migrate bucketing workload from Hive to Spark.

### Does this PR introduce _any_ user-facing change?

Yes, any Hive bucketed table (with Parquet/ORC format) written by Spark, is properly bucketed and can be efficiently processed by Hive and Presto/Trino.

### How was this patch tested?

* Added unit test in BucketedWriteWithHiveSupportSuite.scala, to verify bucket file names and each row in each bucket is written properly.
* Tested by Lyft Spark team (Shashank Pedamallu) to read Spark-written bucketed table from Trino, Spark and Hive.

Closes apache#33432 from c21/hive-bucket-v1.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@juechen507
Copy link

juechen507 commented Jan 4, 2023

Whether the spark-written-hive-bucketed-table can be read by spark-sql to do bucket filter pruning, join, group-by?

In my test, bucket information cannot be used for group-by and join.
DDL is: set spark.sql.hive.convertMetastoreParquet=true; set spark.sql.hive.convertMetastoreOrc=true; CREATE TABLE IF NOT EXISTS tmp.hive_bucketed_table3 (i int, j string) PARTITIONED BY(k string) CLUSTERED BY (i) SORTED BY (i) INTO 8 BUCKETS STORED AS orc;
hive> desc formmated tmp.hive_bucketed_table3
col_name data_type comment
i int
j string
Partition Information
col_name data_type comment
k string
Detailed Table Information
Database: tmp
Owner: hadoop-query
CreateTime: Wed Jan 04 19:32:59 CST 2023
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: (...)
Table Type: MANAGED_TABLE
Table Parameters:
spark.sql.create.version 3.3.2-SNAPSHOT
spark.sql.sources.schema {"type":"struct","fields":[{"name":"i","type":"integer","nullable":true,"metadata":{}},{"name":"j","type":"string","nullable":true,"metadata":{}},{"name":"k","type":"string","nullable":true,"metadata":{}}]}
spark.sql.sources.schema.bucketCol.0 i
spark.sql.sources.schema.numBucketCols 1
spark.sql.sources.schema.numBuckets 8
spark.sql.sources.schema.numPartCols 1
spark.sql.sources.schema.numSortCols 1
spark.sql.sources.schema.partCol.0 k
spark.sql.sources.schema.sortCol.0 i
transient_lastDdlTime 1672831979
Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed: No
Num Buckets: 8
Bucket Columns: [i]
Sort Columns: [Order(col:i, order:1)]
Storage Desc Params:
serialization.format 1

spark-sql> explain select i,count(*) from tmp.hive_bucketed_table3 group by 1;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[k#28, i#26], functions=[count(1)])
+- Exchange hashpartitioning(k#28, i#26, 200), ENSURE_REQUIREMENTS, [plan_id=32]
+- HashAggregate(keys=[k#28, i#26], functions=[partial_count(1)])
+- FileScan orc tmp.hive_bucketed_table3[i#26,k#28] Batched: true, DataFilters: [], Format: ORC...

spark-sql> explain select * from tmp.hive_bucketed_table3 a join tmp.hive_bucketed_table3 b on a.i=b.i limit 1000;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CollectLimit 1000
+- SortMergeJoin [i#50, k#52], [i#53, k#55], Inner
:- Sort [i#50 ASC NULLS FIRST, k#52 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i#50, k#52, 200), ENSURE_REQUIREMENTS, [plan_id=72]
: +- Filter isnotnull(i#50)
: +- FileScan orc tmp.hive_bucketed_table3[i#50,j#51,k#52] Batched: true, DataFilters: [isnotnull(i#50)], Format: ORC...
+- Sort [i#53 ASC NULLS FIRST, k#55 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i#53, k#55, 200), ENSURE_REQUIREMENTS, [plan_id=73]
+- Filter isnotnull(i#53)
+- FileScan orc tmp.hive_bucketed_table3[i#53,j#54,k#55] Batched: true, DataFilters: [isnotnull(i#53)], Format: ORC...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants