Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1700: Write parquet decimal type data in Benchmark using FIXED_LEN_BYTE_ARRAY type #1910

Closed
wants to merge 1 commit into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Apr 24, 2024

What changes were proposed in this pull request?

This PR aims to write parquet decimal type data in Benchmark using FIXED_LEN_BYTE_ARRAY type.

Why are the changes needed?

Because the decimal type of the parquet file generated now corresponds to the binary type of parquet, but Spark3.5.1 does not support reading.
Spark 3.5.1 supports reading if using the FIXED_LEN_BYTE_ARRAY type.

main

  optional binary fare_amount (DECIMAL(8,2));

PR

  optional fixed_len_byte_array(5) fare_amount (DECIMAL(10,2));
java -jar spark/target/orc-benchmarks-spark-2.1.0-SNAPSHOT.jar spark data -format=parquet  -compress zstd -data taxi
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [fare_amount], physicalType: BINARY, logicalType: decimal(8,2)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:199)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:342)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:233)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.orc.bench.spark.SparkBenchmark.processReader(SparkBenchmark.java:170)
	at org.apache.orc.bench.spark.SparkBenchmark.fullRead(SparkBenchmark.java:216)
	at org.apache.orc.bench.spark.jmh_generated.SparkBenchmark_fullRead_jmhTest.fullRead_avgt_jmhStub(SparkBenchmark_fullRead_jmhTest.java:219)

How was this patch tested?

local test

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the JAVA label Apr 24, 2024
tolls_amount: decimal(8,2),
improvement_surcharge: decimal(8,2),
total_amount: decimal(8,2),
fare_amount: decimal(10,2),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing locally I found that there is such data as 82599861, which causes the decimal write to fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Do we happen to know which SPARK JIRA is related to that change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic written in this may not actually be related to Spark, because Spark will convert out-of-range values ​​into NULL.

SELECT CAST(82599861 AS decimal(8, 2))
NULL

If in this PR, the taxi.schema is not modified, the implementation is written directly, and there is no cast involved at this time, so it will fail to write.

java -jar core/target/orc-benchmarks-core-*-uber.jar generate data -format parquet -data taxi -compress snappy
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: arraycopy: destination index -1 out of bounds for byte[4]
	at java.base/java.lang.System.arraycopy(Native Method)
	at org.apache.orc.bench.core.convert.avro.AvroWriter.decimalToBinary(AvroWriter.java:381)
	at org.apache.orc.bench.core.convert.avro.AvroWriter$DecimalConverter.convert(AvroWriter.java:182)
	at org.apache.orc.bench.core.convert.parquet.ParquetWriter.writeBatch(ParquetWriter.java:78)
	at org.apache.orc.bench.core.convert.GenerateVariants.run(GenerateVariants.java:157)
	at org.apache.orc.bench.core.Driver.main(Driver.java:64)

@cxzl25
Copy link
Contributor Author

cxzl25 commented Apr 24, 2024

BINARY type promotion can be supported in Spark 4.0.0, and the test using 4.0.0 SNAPSHOT can work in #1909

[SPARK-40876][SQL] Widening type promotions in Parquet readers

@wgtmac
Copy link
Member

wgtmac commented Apr 27, 2024

Should we use INT32 and INT64 for decimals where applicable?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Apr 28, 2024

Should we use INT32 and INT64 for decimals where applicable?

Yes, Spark does this by default. It provides an option spark.sql.parquet.writeLegacyFormat=true to achieve alignment with Hive writing decimal method.

https://github.com/apache/spark/blob/8b8ea60bd4f22ea5763a77bac2d51f25d2479be9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala#L328-L339

    writeLegacyParquetFormat match {
      // Standard mode, 1 <= precision <= 9, writes as INT32
      case false if precision <= Decimal.MAX_INT_DIGITS => int32Writer


      // Standard mode, 10 <= precision <= 18, writes as INT64
      case false if precision <= Decimal.MAX_LONG_DIGITS => int64Writer


      // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
      case true if precision <= Decimal.MAX_LONG_DIGITS => binaryWriterUsingUnscaledLong


      // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
      case _ => binaryWriterUsingUnscaledBytes

https://github.com/apache/hive/blob/4614ce72a7f366674d89a3a78f687e419400cb89/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java#L568-L578

@wgtmac
Copy link
Member

wgtmac commented May 14, 2024

Could you please check if an email sent from private@orc.apache.org is accidently moved to the spam folder? @cxzl25

@cxzl25
Copy link
Contributor Author

cxzl25 commented May 14, 2024

Could you please check if an email sent from private@orc.apache.org is accidently moved to the spam folder

Wow, I did miss this email, thank you @wgtmac so much for inviting me, and thank you @dongjoon-hyun so much to for commenting and merging multiple times, and to the entire ORC community!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

dongjoon-hyun pushed a commit that referenced this pull request Jul 9, 2024
…EN_BYTE_ARRAY` type

### What changes were proposed in this pull request?
This PR aims to write parquet decimal type data in Benchmark using `FIXED_LEN_BYTE_ARRAY` type.

### Why are the changes needed?
Because the decimal type of the parquet file generated now corresponds to the binary type of parquet, but Spark3.5.1 does not support reading.
Spark 3.5.1 supports reading if using the `FIXED_LEN_BYTE_ARRAY` type.

main
```
  optional binary fare_amount (DECIMAL(8,2));
```

PR
```
  optional fixed_len_byte_array(5) fare_amount (DECIMAL(10,2));
```

```bash
java -jar spark/target/orc-benchmarks-spark-2.1.0-SNAPSHOT.jar spark data -format=parquet  -compress zstd -data taxi
```

```java
org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [fare_amount], physicalType: BINARY, logicalType: decimal(8,2)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:199)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:342)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:233)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.orc.bench.spark.SparkBenchmark.processReader(SparkBenchmark.java:170)
	at org.apache.orc.bench.spark.SparkBenchmark.fullRead(SparkBenchmark.java:216)
	at org.apache.orc.bench.spark.jmh_generated.SparkBenchmark_fullRead_jmhTest.fullRead_avgt_jmhStub(SparkBenchmark_fullRead_jmhTest.java:219)
```

### How was this patch tested?
local test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #1910 from cxzl25/ORC-1700.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit b8481ea)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun
Copy link
Member

Sorry for the long delay. Thank you, @cxzl25 and @wgtmac . :)

@dongjoon-hyun dongjoon-hyun added this to the 2.0.2 milestone Jul 9, 2024
@dongjoon-hyun
Copy link
Member

Merged to main/2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants