Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to write bucketed (but not partitioned) tables #9740

Closed
wants to merge 1 commit into from

Conversation

aditi-pandit
Copy link
Collaborator

@aditi-pandit aditi-pandit commented May 7, 2024

The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

  • Supports CTAS into bucketed (but not partitioned tables)
  • Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

Background

TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.

- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint] 
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary] 
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary] 
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)                                             
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262) 

The above command creates 10 files as follows. 10 is the bucket count.

Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r 
000003_0_20240507_221727_00018_73r2r 
000006_0_20240507_221727_00018_73r2r 
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r 
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r 
000005_0_20240507_221727_00018_73r2r 
000008_0_20240507_221727_00018_73r2r

TableWriter output

The TableWriter output contains three columns per fragment (one for each individual target file). This format is being presented for completeness.
There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.

TableWriter output row
ROWrows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY
Rows Fragments CommitContext
N (numPartitionUpdates) NULL TaskCommitContext
NULL PartitionUpdate0
NULL PartitionUpdate1
NULL ...
NULL PartitionUpdateN

The fragments column is JSON strings of PartitionUpdate as in the following format

{ 
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604", 
"updateMode": "NEW", 
"writePath": "", 
"targetPath": "", 
"fileWriteInfos": [ 
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ] 
"rowCount": 3950431150, 
"inMemoryDataSizeInBytes": 4992001194927, 
"onDiskDataSizeInBytes": 1374893372141, 
"containsNumberedFileNames": false
}

The commitcontext column is a constant vector of TaskCommitContext in JSON string

{ 
"lifespan": "TaskWide", 
"taskId": "20220822_190126_00000_78c2f.1.0.0", 
"pageSinkCommitStrategy": "TASK_COMMIT", 
"lastPage": false
}

Empty buckets

The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.


Note: The Prestissimo changes are in prestodb/presto#22737

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label May 7, 2024
@aditi-pandit aditi-pandit marked this pull request as draft May 7, 2024 23:19
Copy link

netlify bot commented May 7, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 7eac4f6
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/664fbd5fbecd320008bccdc3

@Yuhta Yuhta requested a review from xiaoxmeng May 8, 2024 14:44
@aditi-pandit aditi-pandit force-pushed the bucketed_table branch 4 times, most recently from 96500ec to b10d786 Compare May 10, 2024 05:50
@aditi-pandit aditi-pandit force-pushed the bucketed_table branch 2 times, most recently from 3e71a3a to dad3c3f Compare May 14, 2024 05:35
@aditi-pandit aditi-pandit marked this pull request as ready for review May 14, 2024 05:37
@aditi-pandit aditi-pandit changed the title [Do not review]Add support to write bucketed (but not partitioned) tables Add support to write bucketed (but not partitioned) tables May 14, 2024
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit looks good % minors. Haven't looked at the test yet. Thanks!

velox/connectors/hive/HiveDataSink.h Outdated Show resolved Hide resolved
velox/connectors/hive/HiveDataSink.cpp Outdated Show resolved Hide resolved
Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit LGTM. Thanks!

velox/connectors/hive/HiveDataSink.cpp Outdated Show resolved Hide resolved
velox/connectors/hive/HiveDataSink.cpp Outdated Show resolved Hide resolved
velox/connectors/hive/HiveDataSink.h Outdated Show resolved Hide resolved
@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@aditi-pandit
Copy link
Collaborator Author

Thanks @xiaoxmeng. Addressed the review comments.

@kewang1024
Copy link
Contributor

In the table writer output, it says N (numPartitionUpdates)

But I remember when I was debugging, the N is actually the num of rows being written

@kewang1024 kewang1024 self-requested a review May 25, 2024 01:24
@facebook-github-bot
Copy link
Contributor

@xiaoxmeng merged this pull request in fe65ed0.

Copy link

Conbench analyzed the 1 benchmark run on commit fe65ed0e.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

@aditi-pandit aditi-pandit deleted the bucketed_table branch May 27, 2024 23:52
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
…ncubator#9740)

Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: facebookincubator#9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
…ncubator#9740)

Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: facebookincubator#9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants