Skip to content

Commit

Permalink
Add support to write bucketed (but not partitioned) tables (facebooki…
Browse files Browse the repository at this point in the history
…ncubator#9740)

Summary:
The Velox HiveConnector supports writing bucketed files only when they are partitioned as well. This presents a feature gap wrt Presto.

Presto behavior (for bucketed but not partitioned):

- Supports CTAS into bucketed (but not partitioned tables)
- Cannot append/overwrite to existing bucketed tables (though can append to TEMPORARY ones).

The CTAS into bucketed tables has become important because such tables are used for CTE (WITH clause).
Note: This PR only handles CTAS situations. There will be a separate PR for TEMPORARY tables. prestodb/presto#19744 prestodb/presto#22630

### Background
#### TableWriter and TableFinish

Presto uses TableWriter PlanNodes to do the writing operations. The TableWriter nodes run on the workers. These nodes write the input rows into data files (on a staging directory before moving them to a target directory). The TableWriter node works in conjunction with a TableCommit node on the co-ordinator. The TableCommit node (TableFinishOperator) does the final renaming of target directory and commit to the meta-store.

It is important to note that plans with Bucketed tables involve a LocalExchange that brings all the data to a single driver for TableWriter so that it can bucket and write the data appropriately.

```
EXPLAIN CREATE TABLE lineitem_bucketed2(orderkey, partkey, suppkey, linenumber, quantity, ds) WITH (bucket_count = 10, bucketed_by = ARRAY['orderkey'], sorted_by = ARRAY['orderkey']) AS SELECT orderkey, partkey, suppkey, linenumber, quantity, '2021-12-20' FROM tpch.tiny.lineitem;
```

Plan with TableWriter and TableCommit mode. Note the LocalExchange moving all data to a single driver.
```
- Output[PlanNodeId 7]
     - TableCommit[PlanNodeId 5][Optional[hive.tpch_bucketed.lineitem_bucketed2]] => [rows_23:bigint]
         - RemoteStreamingExchange[PlanNodeId 299][GATHER] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
             - TableWriter[PlanNodeId 6] => [rows:bigint, fragments:varbinary, commitcontext:varbinary]
                     orderkey := orderkey (1:194)  partkey := partkey (1:204) suppkey := suppkey (1:213) linenumber := linenumber (1:222) quantity := quantity (1:234) ds := expr (1:244)
                 - LocalExchange[PlanNodeId 330][SINGLE] () => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varchar(10)] >
                         - RemoteStreamingExchange[PlanNodeId 298][REPARTITION] => [orderkey:bigint, partkey:bigint, suppkey:bigint, linenumber:integer, quantity:double, expr:varcha>
                              - ScanProject[PlanNodeId 0,187][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, project>
                                 expr := VARCHAR'2021-12-20' suppkey := tpch:suppkey (1:262) partkey := tpch:partkey (1:262) linenumber := tpch:linenumber (1:262) orderkey := tpch:orderkey (1:262) quantity := tpch:quantity (1:262)
```

The above command creates 10 files as follows. 10 is the bucket count.

```
Aditis-MacBook-Pro:lineitem_bucketed aditipandit$ pwd
${DATA_DIR}/hive_data/tpch/lineitem_bucketed

Aditis-MacBook-Pro:lineitem_bucketed2 aditipandit$ ls
000000_0_20240507_221727_00018_73r2r
000003_0_20240507_221727_00018_73r2r
000006_0_20240507_221727_00018_73r2r
000009_0_20240507_221727_00018_73r2r
000001_0_20240507_221727_00018_73r2r
000004_0_20240507_221727_00018_73r2r
000007_0_20240507_221727_00018_73r2r
000002_0_20240507_221727_00018_73r2r
000005_0_20240507_221727_00018_73r2r
000008_0_20240507_221727_00018_73r2r
```

#### TableWriter output
The TableWriter output contains three columns per fragment (one for each individual target file).  This format is being presented for completeness.
**There are no special changes for bucketed tables here. The only important difference is that the writePath/targetPath would not contain the partition directory.**

| TableWriter output row |
|--------|
| ROW<rows:BIGINT,fragments:VARBINARY,commitcontext:VARBINARY> |

| Rows |  | Fragments |  | CommitContext |
|--------|--------|--------|--------|--------|
| N (numPartitionUpdates) |  | NULL |  | TaskCommitContext |
| NULL | | PartitionUpdate0 |  |  |
| NULL |  | PartitionUpdate1 |  |  |
| NULL |  | ... |  |  |
| NULL |  | PartitionUpdateN |  |  |

The fragments column is JSON strings of PartitionUpdate as in the following format
```
{
"Name": "ds=2022-08-06/partition=events_pcp_product_finder_product_similartiy__groupby__999999998000212604",
"updateMode": "NEW",
"writePath": "",
"targetPath": "",
"fileWriteInfos": [
   { "writeFileName": "", "targetFileName": "", "fileSize": 3517346970 },
   { "writeFileName": "", "targetFileName": "", "fileSize": 4314798687 }, ]
"rowCount": 3950431150,
"inMemoryDataSizeInBytes": 4992001194927,
"onDiskDataSizeInBytes": 1374893372141,
"containsNumberedFileNames": false
}
```

The commitcontext column is a constant vector of TaskCommitContext in JSON string
```
{
"lifespan": "TaskWide",
"taskId": "20220822_190126_00000_78c2f.1.0.0",
"pageSinkCommitStrategy": "TASK_COMMIT",
"lastPage": false
}
```

#### Empty buckets
The TableWriter generates PartitionUpdate messages only for the files it has written. So if there are empty buckets then there isn't a PartitionUpdate message for it.

If there are no PartitionUpdate output messages for any bucket, then the TableFinish operator fixes the HiveMetaStore with empty files for each bucket. https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java#L1794

### Design

As outlined above all table writing happens in the TableWriter operator.

The TableWriter forwards the write to the HiveDataSink which is registered by the HiveConnector for it.

The HiveDataSink already supported bucketed (and partitioned) tables. So all the logic for wiring bucket metadata and bucket computation already existed. The only missing piece was to handle fileNames for bucketed but not partitioned files in the writerIds, and map the proper writerId to input rows when appending to the HiveDataSink. This PR fixes that.

********************************************
Note: The Prestissimo changes are in prestodb/presto#22737

Pull Request resolved: facebookincubator#9740

Reviewed By: kewang1024

Differential Revision: D57748876

Pulled By: xiaoxmeng

fbshipit-source-id: 33bb77c6fce4d2519f3214e2fb93891f1f910716
  • Loading branch information
aditi-pandit authored and Joe-Abraham committed Jun 7, 2024
1 parent c6e2e90 commit e8e98cc
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 136 deletions.
68 changes: 46 additions & 22 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,22 @@ const HiveWriterId& HiveWriterId::unpartitionedId() {
}

std::string HiveWriterId::toString() const {
if (!partitionId.has_value()) {
return "unpart";
}
if (bucketId.has_value()) {
if (partitionId.has_value() && bucketId.has_value()) {
return fmt::format("part[{}.{}]", partitionId.value(), bucketId.value());
}
return fmt::format("part[{}]", partitionId.value());

if (partitionId.has_value() && !bucketId.has_value()) {
return fmt::format("part[{}]", partitionId.value());
}

// This WriterId is used to add an identifier in the MemoryPools. This could
// indicate unpart, but the bucket number needs to be disambiguated. So
// creating a new label using bucket.
if (!partitionId.has_value() && bucketId.has_value()) {
return fmt::format("bucket[{}]", bucketId.value());
}

return "unpart";
}

const std::string LocationHandle::tableTypeName(
Expand Down Expand Up @@ -379,8 +388,6 @@ HiveDataSink::HiveDataSink(
writerFactory_(dwio::common::getWriterFactory(
insertTableHandle_->tableStorageFormat())),
spillConfig_(connectorQueryCtx->spillConfig()) {
VELOX_USER_CHECK(
!isBucketed() || isPartitioned(), "A bucket table must be partitioned");
if (isBucketed()) {
VELOX_USER_CHECK_LT(
bucketCount_, maxBucketCount(), "bucketCount exceeds the limit");
Expand Down Expand Up @@ -424,14 +431,14 @@ bool HiveDataSink::canReclaim() const {
void HiveDataSink::appendData(RowVectorPtr input) {
checkRunning();

// Write to unpartitioned table.
if (!isPartitioned()) {
// Write to unpartitioned (and unbucketed) table.
if (!isPartitioned() && !isBucketed()) {
const auto index = ensureWriter(HiveWriterId::unpartitionedId());
write(index, input);
return;
}

// Write to partitioned table.
// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

// Lazy load all the input columns.
Expand Down Expand Up @@ -484,8 +491,11 @@ std::string HiveDataSink::stateString(State state) {
}

void HiveDataSink::computePartitionAndBucketIds(const RowVectorPtr& input) {
VELOX_CHECK(isPartitioned());
partitionIdGenerator_->run(input, partitionIds_);
VELOX_CHECK(isPartitioned() || isBucketed());
if (isPartitioned()) {
partitionIdGenerator_->run(input, partitionIds_);
}

if (isBucketed()) {
bucketFunction_->partition(*input, bucketIds_);
}
Expand Down Expand Up @@ -739,20 +749,34 @@ HiveDataSink::maybeCreateBucketSortWriter(
connectorQueryCtx_->sessionProperties()));
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
VELOX_CHECK(isPartitioned());
HiveWriterId HiveDataSink::getWriterId(size_t row) const {
std::optional<int32_t> partitionId;
if (isPartitioned()) {
VELOX_CHECK_LT(partitionIds_[row], std::numeric_limits<uint32_t>::max());
partitionId = static_cast<uint32_t>(partitionIds_[row]);
}

std::optional<int32_t> bucketId;
if (isBucketed()) {
bucketId = bucketIds_[row];
}
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
}

std::fill(partitionSizes_.begin(), partitionSizes_.end(), 0);

const auto numRows = partitionIds_.size();
const auto numRows =
isPartitioned() ? partitionIds_.size() : bucketIds_.size();
for (auto row = 0; row < numRows; ++row) {
VELOX_CHECK_LT(partitionIds_[row], std::numeric_limits<uint32_t>::max());
const uint32_t partitionId = static_cast<uint32_t>(partitionIds_[row]);
const auto id = isBucketed() ? HiveWriterId{partitionId, bucketIds_[row]}
: HiveWriterId{partitionId};
const uint32_t index = ensureWriter(id);
auto id = getWriterId(row);
uint32_t index = ensureWriter(id);

VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
Expand Down Expand Up @@ -822,7 +846,7 @@ std::pair<std::string, std::string> HiveDataSink::getWriterFileNames(
}

HiveWriterParameters::UpdateMode HiveDataSink::getUpdateMode() const {
if (insertTableHandle_->isInsertTable()) {
if (insertTableHandle_->isExistingTable()) {
if (insertTableHandle_->isPartitioned()) {
const auto insertBehavior = hiveConfig_->insertExistingPartitionsBehavior(
connectorQueryCtx_->sessionProperties());
Expand Down Expand Up @@ -866,7 +890,7 @@ bool HiveInsertTableHandle::isBucketed() const {
return bucketProperty() != nullptr;
}

bool HiveInsertTableHandle::isInsertTable() const {
bool HiveInsertTableHandle::isExistingTable() const {
return locationHandle_->tableType() == LocationHandle::TableType::kExisting;
}

Expand Down
16 changes: 10 additions & 6 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {

const HiveBucketProperty* bucketProperty() const;

bool isInsertTable() const;
bool isExistingTable() const;

folly::dynamic serialize() const override;

Expand Down Expand Up @@ -378,13 +378,13 @@ struct HiveWriterId {

HiveWriterId() = default;

explicit HiveWriterId(uint32_t _partitionId)
: HiveWriterId(_partitionId, std::nullopt) {}

HiveWriterId(uint32_t _partitionId, std::optional<uint32_t> _bucketId)
HiveWriterId(
std::optional<uint32_t> _partitionId,
std::optional<uint32_t> _bucketId = std::nullopt)
: partitionId(_partitionId), bucketId(_bucketId) {}

/// Returns the special writer id for the un-partitioned table.
/// Returns the special writer id for the un-partitioned (and non-bucketed)
/// table.
static const HiveWriterId& unpartitionedId();

std::string toString() const;
Expand Down Expand Up @@ -511,6 +511,10 @@ class HiveDataSink : public DataSink {
// Compute the partition id and bucket id for each row in 'input'.
void computePartitionAndBucketIds(const RowVectorPtr& input);

// Get the HiveWriter corresponding to the row
// from partitionIds and bucketIds.
FOLLY_ALWAYS_INLINE HiveWriterId getWriterId(size_t row) const;

// Computes the number of input rows as well as the actual input row indices
// to each corresponding (bucketed) partition based on the partition and
// bucket ids calculated by 'computePartitionAndBucketIds'. The function also
Expand Down
58 changes: 55 additions & 3 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,16 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
return files;
}

void verifyWrittenData(const std::string& dirPath) {
void verifyWrittenData(const std::string& dirPath, int32_t numFiles = 1) {
const std::vector<std::string> filePaths = listFiles(dirPath);
ASSERT_EQ(filePaths.size(), 1);
ASSERT_EQ(filePaths.size(), numFiles);
std::vector<std::shared_ptr<connector::ConnectorSplit>> splits;
std::for_each(filePaths.begin(), filePaths.end(), [&](auto filePath) {
splits.push_back(makeHiveConnectorSplit(filePath));
});
HiveConnectorTestBase::assertQuery(
PlanBuilder().tableScan(rowType_).planNode(),
{makeHiveConnectorSplit(filePaths[0])},
splits,
fmt::format("SELECT * FROM tmp"));
}

Expand Down Expand Up @@ -506,6 +510,54 @@ TEST_F(HiveDataSinkTest, basic) {
verifyWrittenData(outputDirectory->getPath());
}

TEST_F(HiveDataSinkTest, basicBucket) {
const auto outputDirectory = TempDirectoryPath::create();

const int32_t numBuckets = 4;
auto bucketProperty = std::make_shared<HiveBucketProperty>(
HiveBucketProperty::Kind::kHiveCompatible,
numBuckets,
std::vector<std::string>{"c0"},
std::vector<TypePtr>{BIGINT()},
std::vector<std::shared_ptr<const HiveSortingColumn>>{
std::make_shared<HiveSortingColumn>(
"c1", core::SortOrder{false, false})});
auto dataSink = createDataSink(
rowType_,
outputDirectory->getPath(),
dwio::common::FileFormat::DWRF,
{},
bucketProperty);
auto stats = dataSink->stats();
ASSERT_TRUE(stats.empty()) << stats.toString();
ASSERT_EQ(
stats.toString(),
"numWrittenBytes 0B numWrittenFiles 0 spillRuns[0] spilledInputBytes[0B] "
"spilledBytes[0B] spilledRows[0] spilledPartitions[0] spilledFiles[0] "
"spillFillTimeUs[0us] spillSortTime[0us] spillSerializationTime[0us] "
"spillWrites[0] spillFlushTime[0us] spillWriteTime[0us] "
"maxSpillExceededLimitCount[0] spillReadBytes[0B] spillReads[0] "
"spillReadTime[0us] spillReadDeserializationTime[0us]");

const int numBatches = 10;
const auto vectors = createVectors(500, numBatches);
for (const auto& vector : vectors) {
dataSink->appendData(vector);
}
stats = dataSink->stats();
ASSERT_FALSE(stats.empty());
ASSERT_GT(stats.numWrittenBytes, 0);
ASSERT_EQ(stats.numWrittenFiles, 0);

const auto partitions = dataSink->close();
stats = dataSink->stats();
ASSERT_FALSE(stats.empty());
ASSERT_EQ(partitions.size(), numBuckets);

createDuckDbTable(vectors);
verifyWrittenData(outputDirectory->getPath(), numBuckets);
}

TEST_F(HiveDataSinkTest, close) {
for (bool empty : {true, false}) {
SCOPED_TRACE(fmt::format("Data sink is empty: {}", empty));
Expand Down
Loading

0 comments on commit e8e98cc

Please sign in to comment.