Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Optimize ensureFlattened #4415

Merged
merged 1 commit into from
Jan 18, 2024

Conversation

marin-ma
Copy link
Contributor

Use upstream facebook::velox::BaseVector::flattenVector to avoid unnecessary copy.

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/oap-project/gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

auto startTime = std::chrono::steady_clock::now();
// Make sure to load lazy vector if not loaded already.

ScopedTimer timer(&exportNanos_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious could we use velox's MicrosecondTimer to avoid repeated implementation of same functionality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the Timer and ScopedTimer in gluten/cpp/core, not gluten/cpp/velox. So the classes are aimed at common usage. And the Timer counts nanoseconds for accuracy, not microseconds.

@marin-ma marin-ma marked this pull request as ready for review January 16, 2024 09:12
@marin-ma
Copy link
Contributor Author

/Benchmark Velox

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4415_time.csv log/native_master_01_15_2024_8fb2c35d5_time.csv difference percentage
q1 32.59 34.48 1.884 105.78%
q2 24.51 24.98 0.462 101.88%
q3 36.88 38.86 1.987 105.39%
q4 36.94 37.85 0.907 102.45%
q5 70.26 70.70 0.436 100.62%
q6 7.63 5.78 -1.858 75.67%
q7 82.70 85.94 3.245 103.92%
q8 83.61 84.86 1.254 101.50%
q9 121.72 123.88 2.156 101.77%
q10 44.77 46.04 1.269 102.83%
q11 19.94 20.63 0.689 103.46%
q12 23.64 25.93 2.288 109.68%
q13 47.92 46.63 -1.292 97.30%
q14 20.36 23.32 2.959 114.53%
q15 27.63 31.44 3.811 113.79%
q16 14.14 14.88 0.736 105.21%
q17 96.95 100.72 3.771 103.89%
q18 147.46 147.03 -0.428 99.71%
q19 13.46 13.73 0.271 102.01%
q20 31.07 27.44 -3.633 88.31%
q21 224.40 227.79 3.397 101.51%
q22 14.51 13.83 -0.676 95.34%
total 1223.09 1246.73 23.634 101.93%

@@ -83,6 +77,7 @@ int64_t VeloxColumnarBatch::numBytes() {
}

velox::RowVectorPtr VeloxColumnarBatch::getRowVector() const {
VELOX_CHECK_NOT_NULL(rowVector_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_DCHECK_NOT_NULL

@@ -91,12 +91,24 @@ const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) {
VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column.");

auto& firstChild = rv.childAt(0);
VELOX_CHECK(firstChild->type()->isInteger(), "RecordBatch field 0 should be integer");
VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not flat encoding.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VELOX_DCHECK, this logic is guaranteed by code, so the DCHECK is enough


// first column is partition key hash value or pid
return firstChild->asFlatVector<int32_t>()->rawValues();
}

facebook::velox::VectorPtr flatChildAt(const facebook::velox::RowVector& rv, facebook::velox::column_index_t idx) {
auto column = rv.childAt(idx);
if (column->isLazy()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if column is complex datatype? Could this situation happens? The struct column is not lazy and its child column is lazy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The complex types will be handled by PrestoSerializer properly.

assert(stringColumn);
RETURN_NOT_OK(splitBinaryType(binaryIdx, *stringColumn, dstAddrs));
auto column = flatChildAt(rv, colIdx)->asFlatVector<facebook::velox::StringView>();
VELOX_CHECK_NOT_NULL(column);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DCHECK

auto column = rv.childAt(colIdx);
assert(column->isFlatEncoding());

auto column = flatChildAt(rv, colIdx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why flatChildAt becomes required in the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in facebook::velox::BaseVector::flattenVector, it loads lazy vectors but preserves lazy encoding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then is it possible to handle the case inside VeloxColumnarBatch::ensureFlattened ?

@marin-ma
Copy link
Contributor Author

One Spark AQE UT failed, because w/o copy, facebook::velox::RowVector::estimateFlatSize can produce larger data size than the actual uncompressed data size as it calculated the acutal underly buffer size. In the failed test case, the number of rows of RowVector is 6, but the underlying buffer lengths are 4096. #4428 will address this by using "uncompressed size" as the "dataSize" metrics.

@marin-ma
Copy link
Contributor Author

/Benchmark Velox

@Yohahaha
Copy link
Contributor

One Spark AQE UT failed, because w/o copy, facebook::velox::RowVector::estimateFlatSize can produce larger data size than the actual uncompressed data size as it calculated the acutal underly buffer size. In the failed test case, the number of rows of RowVector is 6, but the underlying buffer lengths are 4096. #4428 will address this by using "uncompressed size" as the "dataSize" metrics.

One question here: origin dataSize is split call returned value, which is estimateFlatSize. splitResult.getRawPartitionLengths is arrow buffer size, correct me if wrong.
So, estimateFlatSize is not accurate, arrow buffer size is accurate?

We have observed some query shows the uncompressed size is much smaller than data size, such as 58G(uncompressed size) vs 3.1T(data size).

@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4415_time.csv log/native_master_01_17_2024_6e070aee2_time.csv difference percentage
q1 34.95 32.53 -2.415 93.09%
q2 23.92 25.15 1.225 105.12%
q3 36.80 35.63 -1.172 96.82%
q4 37.47 39.47 1.998 105.33%
q5 68.41 69.91 1.500 102.19%
q6 8.53 7.16 -1.364 84.01%
q7 81.31 83.43 2.112 102.60%
q8 83.39 86.98 3.589 104.30%
q9 122.14 125.57 3.431 102.81%
q10 44.22 42.09 -2.129 95.19%
q11 20.47 20.23 -0.239 98.83%
q12 25.58 27.47 1.889 107.39%
q13 45.55 44.85 -0.706 98.45%
q14 16.61 17.86 1.245 107.50%
q15 26.56 29.77 3.205 112.06%
q16 13.86 13.95 0.094 100.68%
q17 101.13 100.92 -0.210 99.79%
q18 147.06 146.37 -0.694 99.53%
q19 12.62 13.91 1.293 110.24%
q20 26.90 26.50 -0.392 98.54%
q21 223.63 226.25 2.625 101.17%
q22 14.73 13.71 -1.022 93.06%
total 1215.85 1229.72 13.863 101.14%

@marin-ma
Copy link
Contributor Author

One question here: origin dataSize is split call returned value, which is estimateFlatSize. splitResult.getRawPartitionLengths is arrow buffer size, correct me if wrong. So, estimateFlatSize is not accurate, arrow buffer size is accurate?

@Yohahaha The "dataSize" metric in Vanilla Spark refers to the uncompressed size of the InternalRow. As it's used in the AQE "LocalShuffleReader" optimization, my understanding is that they consider this value as the raw uncompressed output data size. On the other hand, the size of the RowVector can be considered as the input size for the shuffle. The uncompressed Arrow buffers align more closely with the concept of the raw uncompressed size in this context.

We have observed some query shows the uncompressed size is much smaller than data size, such as 58G(uncompressed size) vs 3.1T(data size).

This seems to exceed normal expectations. Could you provide some more detailed workload data? For example, data size, data schema, shuffle partitions, etc.

@Yohahaha
Copy link
Contributor

Vanilla Spark update dataSize by UnsafeRow::getSizeInBytes https://github.com/apache/spark/blob/38fc127a232ed4005e6436cdef4c5a1f1bba95f7/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala#L63-L71 and use shuffle bytes written as compressed size.

On the other hand, the size of the RowVector can be considered as the input size for the shuffle.

So you think estimateFlatSize is not the real uncompressed size? From my understanding, shuffle input size is same as uncompressed size, Vanilla Spark use dataSize as QueryStageExec's runtimeStatistics instead of shuffle bytes written.

What I want confirmed is whether arrow buffers really reflect real uncompressed size? is any reuse logic for arrow buffers?

@Yohahaha
Copy link
Contributor

Yohahaha commented Jan 18, 2024

We have observed some query shows the uncompressed size is much smaller than data size, such as 58G(uncompressed size) vs 3.1T(data size).

This seems to exceed normal expectations. Could you provide some more detailed workload data? For example, data size, data schema, shuffle partitions, etc.

Above result is from TPCDS 10T with 8000 partitions.

any query before #4428 can reproduce,
image

@marin-ma
Copy link
Contributor Author

marin-ma commented Jan 18, 2024

So you think estimateFlatSize is not the real uncompressed size?

Yes. And it's true. This PR has one failed UT w/o #4428, that the RowVector length is 6, but the underlying buffer lengths are 4096. esitmateFlatSize counts all buffer size. Before this patch, the RowVector is flattened by copying, and the copied buffers are of length 6. After this patch, the copy is removed so the esitmateFlatSize is much larger than before. It's more likely that this value represents the memory usage, but it cannot represent the "uncompressed size".

What I want confirmed is whether arrow buffers really reflect real uncompressed size? is any reuse logic for arrow buffers?

The reuse logic has nothing to do with the buffer size computing. We accumulate the total raw arrow buffer size each time before the buffers is destroyed or reused.

Above result is from TPCDS 10T with 8000 partitions.

SF10T or SF10 ? Looks like the shuffled data size is very small.

I would think the case you provided is reasonable, because Velox buffer size can be larger than arrow buffers. One known case is that in Velox, any string <= 12 characters will use 12 bytes, while arrow stores real length. But 58G(uncompressed size) vs 3.1T(data size) is very abnormal and needs further investigation. Could you reproduce this case and compare the data size with Vanilla spark? What's the compressed data size (shuffle bytes written)?

@marin-ma marin-ma merged commit dacaa01 into apache:main Jan 18, 2024
17 checks passed
@GlutenPerfBot
Copy link
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_4415_time.csv log/native_master_01_17_2024_6e070aee2_time.csv difference percentage
q1 34.21 32.53 -1.678 95.10%
q2 23.66 25.15 1.490 106.30%
q3 37.80 35.63 -2.166 94.27%
q4 37.79 39.47 1.681 104.45%
q5 66.38 69.91 3.535 105.33%
q6 7.08 7.16 0.080 101.13%
q7 80.27 83.43 3.155 103.93%
q8 83.96 86.98 3.015 103.59%
q9 121.36 125.57 4.205 103.47%
q10 42.61 42.09 -0.522 98.77%
q11 20.33 20.23 -0.102 99.50%
q12 29.73 27.47 -2.260 92.40%
q13 46.02 44.85 -1.172 97.45%
q14 14.94 17.86 2.919 119.54%
q15 27.73 29.77 2.035 107.34%
q16 14.50 13.95 -0.543 96.25%
q17 99.12 100.92 1.807 101.82%
q18 143.32 146.37 3.044 102.12%
q19 13.68 13.91 0.232 101.69%
q20 25.98 26.50 0.528 102.03%
q21 222.20 226.25 4.058 101.83%
q22 14.70 13.71 -0.988 93.28%
total 1207.36 1229.72 22.353 101.85%

@Yohahaha
Copy link
Contributor

I rerun a new case of TPCDS sf100 q99.

Vanilla Gluten
image image

you can see Gluten's dataSize is smaller than shuffle bytes written, is this expected?

@marin-ma
Copy link
Contributor Author

marin-ma commented Jan 30, 2024

you can see Gluten's dataSize is smaller than shuffle bytes written, is this expected?

@Yohahaha Yes. The "dataSize" is the original "uncompressed size". If the data after compression is larger than it's uncompressed size, then we directly write the original uncompressed data, but with a few metadata added. Therefore the "shuffle bytes written" can be greater than the "dataSize" in some cases.

@Yohahaha
Copy link
Contributor

you can see Gluten's dataSize is smaller than shuffle bytes written, is this expected?

@Yohahaha Yes. The "dataSize" is the original "uncompressed size". If the data after compression is larger than it's uncompressed size, then we directly write the original uncompressed data, but with a few metadata added. Therefore the "shuffle bytes written" can be greater than the "dataSize" in some cases.

got it, thank you! but still make no sense for me, will keep tracking this issue.

@marin-ma
Copy link
Contributor Author

@Yohahaha Thanks. Let's discuss in #4576 if you have more inputs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants