Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-25] Optimize HoodieInputFormat.listStatus for faster Hive Incremental queries #689

Merged
merged 1 commit into from Jan 8, 2020

Conversation

bhasudha
Copy link
Contributor

Summary:

  • listStatus() now classifies inputPaths into incremental, non incremental and non hoodie paths.
  • Process each of the input paths separately
  • Incremental queries leverage HoodieCommitMetadata to get partitions that are affected and only lists those partitions as opposed to listing all partitions

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made a first pass.

InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
this.conf = conf;
tableMetaClientMap = new HashMap<>();
nonIncrementalPaths = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these new datastructures we are introducing compared to the existing code ? What is the implication of these when the number of paths are really large ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They cannot be compared to existing code because existing code doesn't look into InputPaths inside HoodieInputFormat. InputPaths are handled only inside FileInputFormat.

Implication of new data structures -
The InputPathHandler is created once per listStatus() call. Within the InputPathHandler object the three dataStructures (nonIncrementalPaths, incrementalPaths and groupedIncrementalPaths) split the total number of InputPaths among them. At max we can expect totally one entry per InputPath in just one of these structures. The mem constraint will be order of total # InputPaths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, so for a table with 400K files performing a snapshot query, we can expect this to be large ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job input paths refers to the partition paths, right? In that case 400K files will map to lesser number of partition paths ?

@n3nash
Copy link
Contributor

n3nash commented Jun 3, 2019

Made 1 pass and left some comments.

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a high level pass. Looks good overall and can approve once pending comments are addressed.

@n3nash n3nash self-requested a review June 14, 2019 21:55
Copy link
Contributor

@n3nash n3nash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhasudha left 1 comment but rest looks good to me. This is a pretty significant change, could you come up with a test/rollout/rollback plan.

@bhasudha
Copy link
Contributor Author

@bhasudha left 1 comment but rest looks good to me. This is a pretty significant change, could you come up with a test/rollout/rollback plan.

Will do!

@bhasudha bhasudha force-pushed the speedup-incremental branch 2 times, most recently from 27acc81 to 038a4e8 Compare June 18, 2019 06:17
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am good with this per se. But this effectively rewrites hoodie-hadoop-mr :) .
Can you test this in a production settting and share more results before merging?

NOTICE.txt Outdated Show resolved Hide resolved
@bhasudha bhasudha force-pushed the speedup-incremental branch 2 times, most recently from bdf3ad9 to 19dc57f Compare July 17, 2019 18:57
@bhasudha
Copy link
Contributor Author

I was able to successfully cross verify the query results between the current HoodieInputFormat and this new HoodieInputFormat for few Uber production tables using spark. I ran different snapshot queries on MOR tables that has count(*), group by's, joins etc. The query latencies were also comparable.

For Incremental queries I can't test it yet, without changing the jar in Hive MetaStore. I will be doing that next. My plan is to have that tested in staging and then gradually rolling it to production.

@n3nash @vinothchandar ^^

@vinothchandar
Copy link
Member

@bhasudha this looks good overall. We are currently stabilzing master . Wil merge once we are in calmer waters

@vinothchandar vinothchandar self-assigned this Aug 1, 2019
@vinothchandar vinothchandar added the pr:wip Work in Progress/PRs label Sep 11, 2019
@leesf
Copy link
Contributor

leesf commented Dec 17, 2019

@bhasudha Could you please rebase to master and merge it as it is ready?

@vinothchandar vinothchandar changed the title [HUDI-25] Optimize HoodieInputFormat.listStatus for faster Hive Incremental queries [WIP] [HUDI-25] Optimize HoodieInputFormat.listStatus for faster Hive Incremental queries Dec 22, 2019
@bhasudha bhasudha force-pushed the speedup-incremental branch 2 times, most recently from 2bdd118 to ade9272 Compare December 31, 2019 20:32
@bhasudha
Copy link
Contributor Author

bhasudha commented Jan 6, 2020

I rebased to latest master and verified the Hive queries in Docker Demo using the new patch. Verified that all queries in the Demo work as expected and incremental queries leverage optimizations in this patch when hive.fetch.task.conversion is disabled (as desired).

I was able to run tests using spark.sql() against some of the production tables (both MOR and COW types). I used --conf spark.sql.hive.convertMetastoreParquet=false so Hive serDe is used instead. Below is a flavor of queries that I tested. The results match between pre-fix and post-fix hudi-spark-bundle jars.

Snapshot queries

simple count:
spark.sql("select count(*) from tableA where datestr = '2019-12-10'").show()

non-hudi hudi datasets join:
spark.sql("select m.col1 as colA, t.col2 as colB from table1 m left join table2 as t on t._row_key = m.col1 and t.datestr >= '2016-01-01' join table3 c on m.col4 = c.col5 where c.col6 = 'XYZ'").show()

non-hudi non-hudi datasets join:
spark.sql("select o.col1, count(distinct e.col2) from tableA o join tableB e on o.id = e.id where to_date(e.col1) >= date_sub(current_date, 10) or to_date(e.col3) >= date_sub(current_date, 10) group by 1 order by 2 desc").show()

hudi hudi datasets join:
spark.sql("select t.id, count(t.load) as total_count FROM tableT t LEFT JOIN tableO o on t.id = o.id AND o.datestr > '2019-12-28' AND NOT o.isactive WHERE t.datestr > '2019-12-28' AND NOT t.isactive group by 1 order by 1,2 desc").show()

group by, order and rank:
spark.sql("select * from ( select , rank() over ( partition by rg order by total_items desc ) as row_number from ( select rg, usr, count() as total_items from tableA where date(datestr) >= date('2019-10-11') and date(datestr) < date('2019-10-16') and event = 'complete' and SUBSTRING_INDEX(rg, '.',1) = 'adhoc' group by 1,2 order by 1, count(*) desc ) ) where row_number <= 1").show()

Incremental queries

spark.sql("select name, count(*) from tableA where event_status = 'complete' and _hoodie_commit_time > '20200101235440' group by 1").show()

@vinothchandar vinothchandar changed the title [WIP] [HUDI-25] Optimize HoodieInputFormat.listStatus for faster Hive Incremental queries [HUDI-25] Optimize HoodieInputFormat.listStatus for faster Hive Incremental queries Jan 7, 2020
@vinothchandar vinothchandar removed the pr:wip Work in Progress/PRs label Jan 7, 2020
@vinothchandar
Copy link
Member

Thanks for the update @bhasudha and welcome back :) .. Will make a final pass and then merge.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bhasudha just the one question on pom dependency. Let's resolve that and you can self-merge when ready

hudi-hadoop-mr/pom.xml Outdated Show resolved Hide resolved
…remental queries on Hoodie

    Summary:
    - InputPathHandler class classifies  inputPaths into incremental, non incremental and non hoodie paths.
    - Incremental queries leverage HoodieCommitMetadata to get partitions that are affected and only lists those partitions as opposed to listing all partitions
    - listStatus() processes each category separately
@bhasudha
Copy link
Contributor Author

bhasudha commented Jan 8, 2020

Tests are passing and review commends are addressed. Merging this code in.

@bhasudha bhasudha merged commit d09eacd into apache:master Jan 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants