Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow: Pad decimal bytes before passing to decimal vector #5168

Merged
merged 5 commits into from Jul 1, 2022

Conversation

bryanck
Copy link
Contributor

@bryanck bryanck commented Jun 30, 2022

The vectorized reader benchmarks showed that the Iceberg Parquet vectorized reader falls behind the one in Spark when reading decimal types. When profiling the code, a bottleneck was discovered in a method in Arrow that pads the byte buffer when setting a value in the DecimalVector, specifically this operation.

Runs of this benchmark showed that calling Unsafe.setMemory() can be slower than Java array operations. Results of a run are here.

This PR adds a workaround that pads the byte buffer before calling setBigEndian() to avoid Unsafe.setMemory() from being called.

Here are the results of a run of the VectorizedReadDictionaryEncodedFlatParquetDataBenchmark benchmark without this change:

Benchmark                                                                                  Mode  Cnt   Score   Error  Units
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDatesIcebergVectorized5k         ss    5   2.016 ± 0.069   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDatesSparkVectorized5k           ss    5   2.083 ± 0.076   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDecimalsIcebergVectorized5k      ss    5  14.451 ± 0.273   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDecimalsSparkVectorized5k        ss    5   6.886 ± 0.163   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDoublesIcebergVectorized5k       ss    5   2.058 ± 0.108   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDoublesSparkVectorized5k         ss    5   1.731 ± 0.117   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readFloatsIcebergVectorized5k        ss    5   1.905 ± 0.016   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readFloatsSparkVectorized5k          ss    5   2.436 ± 0.178   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readIntegersIcebergVectorized5k      ss    5   2.975 ± 0.053   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readIntegersSparkVectorized5k        ss    5   2.461 ± 0.951   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readLongsIcebergVectorized5k         ss    5   2.713 ± 0.075   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readLongsSparkVectorized5k           ss    5   2.321 ± 0.953   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readStringsIcebergVectorized5k       ss    5   3.154 ± 0.062   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readStringsSparkVectorized5k         ss    5   4.567 ± 1.864   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readTimestampsIcebergVectorized5k    ss    5   2.674 ± 0.085   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readTimestampsSparkVectorized5k      ss    5   2.634 ± 0.089   s/op

Here are the results of a run with this change:

Benchmark                                                                                  Mode  Cnt  Score   Error  Units
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDatesIcebergVectorized5k         ss    5  2.339 ± 1.092   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDatesSparkVectorized5k           ss    5  2.204 ± 0.085   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDecimalsIcebergVectorized5k      ss    5  8.501 ± 0.129   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDecimalsSparkVectorized5k        ss    5  7.130 ± 0.111   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDoublesIcebergVectorized5k       ss    5  2.677 ± 0.083   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readDoublesSparkVectorized5k         ss    5  2.251 ± 0.142   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readFloatsIcebergVectorized5k        ss    5  2.616 ± 0.090   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readFloatsSparkVectorized5k          ss    5  2.438 ± 0.074   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readIntegersIcebergVectorized5k      ss    5  2.620 ± 0.171   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readIntegersSparkVectorized5k        ss    5  2.242 ± 0.140   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readLongsIcebergVectorized5k         ss    5  2.679 ± 0.084   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readLongsSparkVectorized5k           ss    5  2.504 ± 0.173   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readStringsIcebergVectorized5k       ss    5  3.804 ± 0.215   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readStringsSparkVectorized5k         ss    5  4.864 ± 0.163   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readTimestampsIcebergVectorized5k    ss    5  2.544 ± 0.086   s/op
VectorizedReadDictionaryEncodedFlatParquetDataBenchmark.readTimestampsSparkVectorized5k      ss    5  2.524 ± 0.193   s/op

@github-actions github-actions bot added the arrow label Jun 30, 2022
@bryanck
Copy link
Contributor Author

bryanck commented Jun 30, 2022

I'd be interested to see what results others are seeing with the benchmarks.

@danielcweeks danielcweeks merged commit 7e1ade8 into apache:master Jul 1, 2022
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer();
vector.getDataBuffer().setBytes(idx, buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck, was this really setting the value twice? It looks like it was calling setBigEndian on the vector and then setBytes on the backing buffer. That could explain a lot of the slowness as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that's what it was doing.

} else if (bigEndianBytes.length < newLength) {
byte[] result = new byte[newLength];
if (bigEndianBytes.length == 0) {
return result;
Copy link
Contributor

@rdblue rdblue Jul 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck, is this hit? It looks like an invalid case because the decimal precision would need to be 0, but we're choosing to return 0 for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, I was mimicking the behavior in DecimalVector.setBigEndian() to be on the safe side.

byte[] vectorBytes =
DecimalVectorUtil.padBigEndianBytes(
dict.decodeToBinary(currentVal).getBytesUnsafe(),
DecimalVector.TYPE_WIDTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is typeWidth going to be the same as DecimalVector.TYPE_WIDTH?

Copy link
Contributor Author

@bryanck bryanck Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typeWidth is the Parquet width, I believe, which is variable depending on the precision of the decimal, but the Arrow width is always 16.

@@ -358,7 +358,8 @@ class FixedLengthDecimalReader extends BaseReader {
protected void nextVal(
FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int typeWidth, byte[] byteArray) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, byteArray);
byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, DecimalVector.TYPE_WIDTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a place where we could reuse a buffer rather than allocating in padBigEndianBytes every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing, and reusing the buffer was a little bit slower, partly because we need to always to fill the buffer to zero out the last value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that was a little bit faster was to bypass DecimalVector.setBigEndian(), convert to little endian (if needed) and copy the bytes directly to the value buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also one thing to note is that the benchmark isn't quite right. Decimal(20,5) will end up taking 9 bytes and will thus use a fixed length byte array instead of long or int encoding. And fixed length byte arrays aren't dictionary encoded in Parquet v1. That explains why the decimal benchmark is much slower than the other data types (which are dictionary encoded).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(It looks like dictionary encoding for fixed length byte arrays wouldn't work correctly anyway, I may follow up with a fix for that)

Copy link
Contributor Author

@bryanck bryanck Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought about reusing the buffer, we could create a buffer per value reader so the width of the value is the same, then skip the array fill (if you have 2 buffers, one for negative and one for positive values)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the PR that has a fix for the dictionary encoding

@@ -369,9 +370,10 @@ protected void nextDictEncodedVal(
reader.fixedLengthDecimalDictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
} else if (Mode.PACKED.equals(mode)) {
ByteBuffer decimalBytes = dict.decodeToBinary(reader.readInteger()).toByteBuffer();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this correct before? It looks like it was trying to use System.arraycopy with a ByteBuffer!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would have thrown an ArrayStoreException

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
* Arrow: Pad decimal bytes before passing to vector

* comment clarification

* optimize fill for neg numbers

* Add overflow check
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
* Arrow: Pad decimal bytes before passing to vector

* comment clarification

* optimize fill for neg numbers

* Add overflow check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants