Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] spark hive connector failed to read tpcds parquet table #6503

Open
3 of 4 tasks
FANNG1 opened this issue Jun 25, 2024 · 3 comments
Open
3 of 4 tasks

[Bug] spark hive connector failed to read tpcds parquet table #6503

FANNG1 opened this issue Jun 25, 2024 · 3 comments
Labels
kind:bug This is a clearly a bug priority:major

Comments

@FANNG1
Copy link

FANNG1 commented Jun 25, 2024

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

I follow the steps to generate tpcds data for spark https://github.com/yaooqinn/tpcds-for-spark/tree/master, when querying data get bellow exceptions:
select * from catalog_sales limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/catalog_sales/cs_sold_date_sk=2450815/part-00005-8942c9cb-bf45-4521-b53d-ab272c62ce58.c000.gz.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
	at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
	at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
	at org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$BinaryConverter.setDictionary(ETypeConverter.java:283)
	at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
	at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
	at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:141)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)

select * from store_returns limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/store_returns/sr_returned_date_sk=2450820/part-00006-99abfdab-303f-4af4-8be1-46e581a8b189.c000.gz.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
	at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$8$1
	at org.apache.parquet.io.api.PrimitiveConverter.addInt(PrimitiveConverter.java:98)
	at org.apache.parquet.column.impl.ColumnReaderBase$2$3.writeValue(ColumnReaderBase.java:297)
	at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)

Affects Version(s)

1.8.2

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.
@FANNG1 FANNG1 added kind:bug This is a clearly a bug priority:major labels Jun 25, 2024
@pan3793
Copy link
Member

pan3793 commented Jun 25, 2024

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

@FANNG1
Copy link
Author

FANNG1 commented Jun 25, 2024

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

Yes, it works after setting spark.sql.parquet.writeLegacyFormat to true. Is there any other limitations about kyuubi spark hive connector ? and any plan to support new parquet format?

@pan3793
Copy link
Member

pan3793 commented Jun 25, 2024

Limitations were listed on our first meeting. KSCH only supports using Hive SerDe to read/write Hive tables, consequently, it has the same limitation as Spark's built-in Hive implementation, specifically, Hive 2.3.9, e.g. poor performance because of non-vectorized, does not support new Parquet logical types.

This requires a mechanism to respect spark.sql.hive.convertMetastoreParquet(or define a new dedicated configuration) to convert the Hive Parquet table reading to the Spark DataSource table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug priority:major
Projects
None yet
Development

No branches or pull requests

2 participants