Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table #3919

Closed
vortual opened this issue Nov 3, 2021 · 3 comments
Labels
priority:critical production down; pipelines stalled; Need help asap.

Comments

@vortual
Copy link
Contributor

vortual commented Nov 3, 2021

Describe the problem you faced
hudi version: 0.9.0
hudi版本:0.9.0

table type: merge on read
表类型:merge on read

when the log file haven't compact,the base file haven't generate yet. spark query the table have an error
当log文件还没被压缩生成对应的basefile时,spark查询会报错

Error log:
org.apache.hudi.exception.HoodieException: Error obtaining data file/log file grouping at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:156) at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getSplits(HoodieParquetRealtimeInputFormat.java:69) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150) at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841) at org.apache.spark.sql.Dataset.head(Dataset.scala:2150) at org.apache.spark.sql.Dataset.take(Dataset.scala:2363) at org.apache.spark.sql.Dataset.showString(Dataset.scala:241) at org.apache.spark.sql.Dataset.show(Dataset.scala:637) at org.apache.spark.sql.Dataset.show(Dataset.scala:596) at org.apache.spark.sql.Dataset.show(Dataset.scala:605) ... 48 elided Caused by: java.lang.NullPointerException at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$null$8(HoodieRealtimeInputFormatUtils.java:132) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$getRealtimeSplits$9(HoodieRealtimeInputFormatUtils.java:129) at java.util.HashMap$KeySet.forEach(HashMap.java:933) at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:102) ... 100 more

To Reproduce

  1. create kafka source
    CREATE TABLE kafka_source( user_id STRING, order_amount BIGINT, log_ts TIMESTAMP(3), part STRING )WITH( 'connector' = 'kafka', 'topic' = 'flink_on_hudi_zrm', 'properties.bootstrap.servers' = 'node2:6667', 'scan.startup.mode'='earliest-offset', 'properties.group.id' = 'testGroup', 'format' = 'json' );
    2.sync to hudi from kafka
    CREATE TABLE kafka_source_hudi( user_id VARCHAR(20), order_amount BIGINT, log_ts TIMESTAMP(3), part VARCHAR(20) ) PARTITIONED BY (part) WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi', 'table.type' = 'MERGE_ON_READ', 'write.bucket_assign.tasks' = '2', 'write.precombine.field' = 'log_ts', 'write.tasks' = '2', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://node7:9083', 'hoodie.datasource.write.recordkey.field' = 'user_id', 'compaction.tasks' = '4', 'compaction.delta_commits' = '3' );
    insert into kafka_source_hudi select * from kafka_source;
  2. create external table
    CREATE EXTERNAL TABLE test.kafka_source_hudi_spark( _hoodie_commit_timestring, _hoodie_commit_seqnostring, _hoodie_record_keystring, _hoodie_partition_pathstring, _hoodie_file_namestring, user_id string, order_amount BIGINT, log_ts bigint) PARTITIONED BY ( part` string)
    ROW FORMAT SERDE
    'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
    'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
    'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi';

alter table test.kafka_source_hudi_spark add if not exists partition(part='par1') location 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par1';
alter table test.kafka_source_hudi_spark add if not exists partition(part='par2') location 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par2';`

3.query the external table
select * from test.kafka_source_hudi_spark limit 10

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version :0.9.0

  • Spark version :2.2.0

  • Hive version :1.2.1000

  • Hadoop version :2.7.3

  • Storage (HDFS/S3/GCS..) :HDFS

  • Running on Docker? (yes/no) :NO

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

@vortual
Copy link
Contributor Author

vortual commented Nov 3, 2021

我找到了出问题代码的地方,在HoodieRealtimeInputFormatUtils类里面:
List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
这一行groupedInputSplits这里存的是每个file group对应的parquet文件,没有包括log文件
fileSlice是新增的log,因为groupedInputSplits里面没有对应的filegroup,只有parquet没有log,所以得到的dataFileSplits 为空。之后再调用dataFileSplits.forEach(split -> {})时就会出现空指针异常。

I have found the wrong code, inside the HoodieRealtimeInputFormatUtils class:
List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
The groupedInputSplits Map contains the Parquet files corresponding to each file group, and the log files are not included
FileSlice is the new log, and the dataFileSplits obtained are empty because there is no corresponding Filegroup in the groupedInputSplits Map and only Parquet has no log file.A subsequent call to datafilesplits. forEach(split -> {}) will result in a null pointer exception.

@vortual vortual changed the title [SUPPORT]Error obtaining data file/log file grouping when log don't have base file [SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table Nov 4, 2021
@nsivabalan nsivabalan added priority:critical production down; pipelines stalled; Need help asap. awaiting-triage labels Nov 5, 2021
@vortual
Copy link
Contributor Author

vortual commented Nov 10, 2021

I noticed that this problem didn't occur when I set write.bucket_assign.tasks =1.

@vortual
Copy link
Contributor Author

vortual commented Nov 27, 2021

I think #3203 has been resolved this issue

@vortual vortual closed this as completed Nov 28, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap.
Projects
None yet
Development

No branches or pull requests

2 participants