Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Inconsistent query result using GetLatestBaseFiles compared to Snapshot Query #5231

Closed
codejoyan opened this issue Apr 5, 2022 · 9 comments

Comments

@codejoyan
Copy link
Contributor

codejoyan commented Apr 5, 2022

I am trying to compare the query output from a snapshot query VS a query to fetch data from files returned by GetLatestBaseFiles (as below).

What might be the reason for the below 2 observations:

  1. Difference in count between Section A and Section B.
  2. I am using a COW table but the latest file slice for a file group doesn't have all the records. (Section C)

Files listed by GetLatestBaseFiles

d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet
24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet

Section A: SnapShot Query Output (Expected)

scala> spark.sql("select date, count(1) from stock_ticks_cow group by date").show(false)
+----------+--------+                                                           
|date      |count(1)|
+----------+--------+
|2019/08/31|197     |
|2018/08/31|197     |
+----------+--------+

Section B: Query Output Using list of files returned by GetLatestBaseFiles

scala> spark.sql("select date, count(1) from stock_ticks_cow where _hoodie_file_name in ('d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet', '24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet') group by date").show(false)
+----------+--------+
|date      |count(1)|
+----------+--------+
|2019/08/31|197     |
|2018/08/31|99      |
+----------+--------+

Section C: The latest file slice has only a subset of records in COW (expected - 197, actual - 99)

-rw-r--r--   1 root supergroup         96 2022-04-05 11:46 /user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata
-rw-r--r--   1 root supergroup     443927 2022-04-05 11:51 /user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115122268.parquet
-rw-r--r--   1 root supergroup     443653 2022-04-05 11:52 /user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet
-rw-r--r--   1 root supergroup     443919 2022-04-05 11:46 /user/hive/warehouse/stock_ticks_cow/2018/08/31/24cf1c28-e142-41ce-9131-65db1d0a83c3-0_1-35-37_20220405114604187.parquet

scala> spark.sql("select _hoodie_file_name, count(distinct key) from stock_ticks_cow where _hoodie_file_name in ('24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet') group by _hoodie_file_name").show(false)
+------------------------------------------------------------------------+-------------------+
|_hoodie_file_name                                                       |count(DISTINCT key)|
+------------------------------------------------------------------------+-------------------+
|24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet|99                 |
+------------------------------------------------------------------------+-------------------+

To Reproduce

Steps to reproduce the behavior:

List Latest Base Files

scala> val basePath = "/user/hive/warehouse/stock_ticks_cow"
basePath: String = /user/hive/warehouse/stock_ticks_cow

scala> val conf: SerializableConfiguration = new SerializableConfiguration(new Configuration())
conf: org.apache.hudi.common.config.SerializableConfiguration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml

scala> val engineContext: HoodieLocalEngineContext = new HoodieLocalEngineContext(conf.get());
engineContext: org.apache.hudi.common.engine.HoodieLocalEngineContext = org.apache.hudi.common.engine.HoodieLocalEngineContext@b8471c9

scala> val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(conf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
metaClient: org.apache.hudi.common.table.HoodieTableMetaClient = HoodieTableMetaClient{basePath='/user/hive/warehouse/stock_ticks_cow', metaPath='/user/hive/warehouse/stock_ticks_cow/.hoodie', tableType=COPY_ON_WRITE}

scala> val timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
timeline: org.apache.hudi.common.table.timeline.HoodieTimeline = org.apache.hudi.common.table.timeline.HoodieDefaultTimeline: [20220405114604187__commit__COMPLETED],[20220405115122268__commit__COMPLETED],[20220405115234824__commit__COMPLETED]

scala> val metadataConfig = HoodieInputFormatUtils.buildMetadataConfig(conf.get())
metadataConfig: org.apache.hudi.common.config.HoodieMetadataConfig = org.apache.hudi.common.config.HoodieMetadataConfig@732b3ec8

scala> val fsView = new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig)
fsView: org.apache.hudi.metadata.HoodieMetadataFileSystemView = org.apache.hudi.metadata.HoodieMetadataFileSystemView@4a20f6ea

scala> val partitions = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).iterator().asScala.toList;
partitions: List[String] = List(2018/08/31, 2019/08/31)                  

scala> partitions.flatMap(x => {
     | val engContext = new HoodieLocalEngineContext(conf.get());
     | val fsView = new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataConfig);
     | fsView.getLatestBaseFiles(x).iterator().asScala.toList.map(_.getFileName)
     | })
res15: List[String] = List(24cf1c28-e142-41ce-9131-65db1d0a83c3-0_0-21-22_20220405115234824.parquet, d086d323-b8c0-4d34-a664-40f8d9d301fd-0_0-35-36_20220405114604187.parquet)

Steps to Reproduce
I am following the steps in the docker demo. There are 2 json files (batch_1.json, batch_2.json) in docker/demo/data. I created an additional json file batch_3.json. Just changed the year from 2018 to 2019 from the batch_1.json file.
Commit 1:
terminal 1:

j0s0j7j@m-c02d25lnmd6n data % cat batch_3.json | kcat -b kafkabroker -t stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t stock_tick -P

terminal 2:

docker exec -it adhoc-2 /bin/bash

# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow table in HDFS
spark-submit \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
  --table-type COPY_ON_WRITE \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
  --source-ordering-field ts  \
  --target-base-path /user/hive/warehouse/stock_ticks_cow \
  --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider

Commit 2:
terminal 1:

j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t stock_tick -P
j0s0j7j@m-c02d25lnmd6n data % cat batch_1.json | kcat -b kafkabroker -t stock_tick -P

terminal 2:
Execute deltastreamer job as Commit 1

terminal 2:
Commit 3:
terminal 1:
j0s0j7j@m-c02d25lnmd6n data % cat batch_2.json | kcat -b kafkabroker -t stock_tick -P
terminal 2:
Execute deltastreamer job as Commit 1

Environment Description

  • Hudi version : Built using master branch (0.11)

  • Spark version : 2.4.4

  • Running on Docker? yes

@alexeykudinkin
Copy link
Contributor

@codejoyan can you please also paste contents of .hoodie folder?

@codejoyan
Copy link
Contributor Author

@alexeykudinkin Unfortunately I shutdown the docker instance. But this can be replicated.
Let me run one more time and update the details

@codejoyan
Copy link
Contributor Author

@alexeykudinkin here is the content of the .hoodie file, the data files and the data file counts.
Let me know if you need any further info.

.hoodie file content

root@adhoc-2:/opt# hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/.hoodie/
Found 14 items
drwxr-xr-x   - root supergroup          0 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/.aux
drwxr-xr-x   - root supergroup          0 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/.temp
-rw-r--r--   1 root supergroup       4442 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182630323.commit
-rw-r--r--   1 root supergroup          0 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182630323.commit.requested
-rw-r--r--   1 root supergroup       3017 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182630323.inflight
-rw-r--r--   1 root supergroup       2825 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182714571.commit
-rw-r--r--   1 root supergroup          0 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182714571.commit.requested
-rw-r--r--   1 root supergroup       3131 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182714571.inflight
-rw-r--r--   1 root supergroup       2823 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182741563.commit
-rw-r--r--   1 root supergroup          0 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182741563.commit.requested
-rw-r--r--   1 root supergroup       3129 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/.hoodie/20220406182741563.inflight
drwxr-xr-x   - root supergroup          0 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/archived
-rw-r--r--   1 root supergroup        512 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/hoodie.properties
drwxr-xr-x   - root supergroup          0 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/.hoodie/metadata

Data Files Listing:

root@adhoc-2:/opt# hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2018/08/31/
Found 4 items
-rw-r--r--   1 root supergroup         96 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/2018/08/31/.hoodie_partition_metadata
-rw-r--r--   1 root supergroup     443929 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/2018/08/31/c872d135-bf8f-4c5e-9eee-6347635c32d3-0_0-21-22_20220406182714571.parquet
-rw-r--r--   1 root supergroup     443651 2022-04-06 18:27 /user/hive/warehouse/stock_ticks_cow/2018/08/31/c872d135-bf8f-4c5e-9eee-6347635c32d3-0_0-21-22_20220406182741563.parquet
-rw-r--r--   1 root supergroup     443927 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/2018/08/31/c872d135-bf8f-4c5e-9eee-6347635c32d3-0_1-35-37_20220406182630323.parquet
root@adhoc-2:/opt# hdfs dfs -ls /user/hive/warehouse/stock_ticks_cow/2019/08/31/
Found 2 items
-rw-r--r--   1 root supergroup         96 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/2019/08/31/.hoodie_partition_metadata
-rw-r--r--   1 root supergroup     443971 2022-04-06 18:26 /user/hive/warehouse/stock_ticks_cow/2019/08/31/258177c0-b9eb-43be-9fad-7c1d57dd4279-0_0-35-36_20220406182630323.parquet

Data File Count

scala> spark.sql("select _hoodie_file_name, date, count(1) from stock_ticks_cow group by _hoodie_file_name, date").show(false);
+------------------------------------------------------------------------+----------+--------+
|_hoodie_file_name                                                       |date      |count(1)|
+------------------------------------------------------------------------+----------+--------+
|c872d135-bf8f-4c5e-9eee-6347635c32d3-0_0-21-22_20220406182741563.parquet|2018/08/31|99      |
|c872d135-bf8f-4c5e-9eee-6347635c32d3-0_0-21-22_20220406182714571.parquet|2018/08/31|98      |
|258177c0-b9eb-43be-9fad-7c1d57dd4279-0_0-35-36_20220406182630323.parquet|2019/08/31|197     |
+------------------------------------------------------------------------+----------+--------+

@alexeykudinkin
Copy link
Contributor

@codejoyan this is a funny one

So i was able to reproduce behavior that you're seeing and it turns out to be that _hoodie_file_name is simply not updated during Commit 3, meaning that during C3, all records are copied from latest base-file of the file-group into new latest base-file (in your most recent experiment it's c872d135-bf8f-4c5e-9eee-6347635c32d3-0_0-21-22_20220406182741563.parquet) but it doesn't update the _hoodie_file_name field which is kept pointing at the old file.

@alexeykudinkin
Copy link
Contributor

Created HUDI-3855 to track

@alexeykudinkin
Copy link
Contributor

Validated that #5296 addresses the issue:

cala> val partitions = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).iterator().asScala.toList;
partitions: List[String] = List(2018/08/31, 2019/08/31)

scala>

scala>

scala> partitions.flatMap(x => {
     |   val engContext = new HoodieLocalEngineContext(conf.get());
     |   val fsView = new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataConfig);
     |   fsView.getLatestBaseFiles(x).iterator().asScala.toList.map(_.getFileName)
     | })
res0: List[String] = List(c4ea1cd9-0fec-4f7f-8272-e093fe6f9344-0_0-21-22_20220412225124731.parquet, be940ea6-2ece-405b-8de0-626e803050d8-0_0-36-37_20220412224915898.parquet)

scala>

scala> spark.read.format("hudi").load("hdfs:///user/hive/warehouse/stock_ticks_cow").createOrReplaceTempView("stock_ticks_cow")

scala>

scala> spark.sql("select date, count(1) from stock_ticks_cow group by date").show(false)
+----------+--------+
|date      |count(1)|
+----------+--------+
|2019/08/31|197     |
|2018/08/31|197     |
+----------+--------+


scala> spark.sql("select _hoodie_file_name, date, count(1) from stock_ticks_cow group by _hoodie_file_name, date").show(false);
+------------------------------------------------------------------------+----------+--------+
|_hoodie_file_name                                                       |date      |count(1)|
+------------------------------------------------------------------------+----------+--------+
|be940ea6-2ece-405b-8de0-626e803050d8-0_0-36-37_20220412224915898.parquet|2019/08/31|197     |
|c4ea1cd9-0fec-4f7f-8272-e093fe6f9344-0_0-21-22_20220412225124731.parquet|2018/08/31|197     |
+------------------------------------------------------------------------+----------+--------+

@codejoyan
Copy link
Contributor Author

Thanks @alexeykudinkin for the solution. I will do some testing and go through the PR.
Will let you know if I have further questions. Thank you very much!

@vingov
Copy link
Contributor

vingov commented Apr 13, 2022

Thanks, @codejoyan & @alexeykudinkin for fixing this critical issue for BQ integration!

@nsivabalan
Copy link
Contributor

thanks @alexeykudinkin to find the root cause and fixing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants