Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21406][RecordFormat] build AvroParquetRecordFormat for the new FileSource #17501

Closed
wants to merge 2 commits into from

Conversation

JingGe
Copy link
Contributor

@JingGe JingGe commented Oct 15, 2021

What is the purpose of the change

The goal of this PR is to provide AvroParquetRecordFormat implementation to read avro GenericRecords from parquet via the new Flink FileSource.

Another goal of this PR is to make everyone on the same page w.r.t. the implementation design. Once the design is settled down, the splitting support and more avro record types beyond the GenericRecord as new features will be easily done in a follow-up PRs.

This is the draft PR, there is one failed unit test case, which is documented out for now with TODO comment. I am still working on it.

Brief change log

  • Create RecordFormat interface which focuses on Path with default methods implementation and delegate createReader() calls to the overloaded methods from StreamFormat that focuses on FDDataInputStream.
  • Create AvroParquetRecordFormat implementation. Only reading avro GenericRecord from parquet file or stream is supported in this version. Support for other avro record types will be implemented later.
  • Splitting is not supported in this version.

Open Questions

To give you some background, the original idea was to let AvroParquetRecordFormat implement FileRecordFormat. After considering that FileRecordFormat and StreamFormat have too many commons and StreamFormat has more built-in features like compression support(via StreamFormatAdapter), current design is based on StreamFormat. In order to keep SIP clear, 2-levels interfaces have been defined. Let StreamFormat focuses on the abstract input stream and let RecordFormat pay attention to the concrete FileSystem, i.e. the Path. RecordFormat provides default implementation for the overloaded createReader(...) methods. Subclasses are therefore not forced to implement them.

Following are some questions open for discussion:

  1. Compare to the 2-levels interfaces design, as another option, all default methods implemented in the RecordFormat could be merged into the StreamFormat. Such design keeps all createReader(...) methods in one interface(StreamFormat only, no more RecordFormat) that is in some ways easier to use. The downside is the SIP violation. Based on these consideration, I chose the current 2-levels API design.
  2. After considering priorities, Splitting is currently not supported, since we didn't get strong requirement from the business side. It will be implemented later, when the time comes.
  3. If this design works well, as next step, we should consider replacing the FileRecordFormat with the RecordFormat. In this way, the duplicated code and javacode could be avoided. This question is a little bit out of the scope of this PR, but does relate to the topic, could be therefore discussed too.

Verifying this change

This change added tests and can be verified as follows:

New unit test validates that :

  • Reader can be created and restored correctly via Path.
  • Violation constraint checks for no splitting, null path, and no restoreOffset are working well.
  • GenericRecords can be read correctly from parquet file.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 15, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 15, 2021

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 9d5ac3c (Tue Dec 14 09:07:22 UTC 2021)

Warnings:

  • 2 pom.xml files were touched: Check for build and licensing issues.
  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

Copy link

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for providing this pull request but I have a few preliminary questions about the design.

Every time I read something about parquet formats I always think the format should be based on the BulkFormat interface. Why did you base your implementation on the StreamFormat?

As a second point, I'd like to see an IT case using the new format with the FileSource. Did you already test this?

Copy link

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the nice discussion! I think I now better understand the contribution.

Can you rename the class AvroParquetRecordFormat to ParquetAvro...? The codebase already has a ParquetAvroWriters class and it would be nice to keep it consistent.

@JingGe
Copy link
Contributor Author

JingGe commented Oct 21, 2021

Thanks for the nice discussion! I think I now better understand the contribution.

Can you rename the class AvroParquetRecordFormat to ParquetAvro...? The codebase already has a ParquetAvroWriters class and it would be nice to keep it consistent.

I named it after the naming convention from apache parquet lib e.g. AvroParquetReader. It sound nature for "reading avro from parquet". I would suggest we change ParquetAvroWriters to AvroParquetWriters.

@echauchot
Copy link
Contributor

echauchot commented Oct 24, 2021

R:@echauchot
@JingGe thanks a lot for your work ! If I may, I'd like to review this PR as well as I was the author of the ParquetAvroInputFormat for the older source.

@JingGe
Copy link
Contributor Author

JingGe commented Oct 25, 2021

R:@echauchot @JingGe thanks a lot for your work ! If I may, I'd like to review this PR as I was the author of the ParquetAvroInputFormat for the older source.

@echauchot Thanks for your interest and effort. It would be great if you would review this PR. Appreciate it. Please be aware of the "draft" of the current PR status. There are a lot information written in the PR description at the beginning that might hopefully give you the background info for the review.

@echauchot
Copy link
Contributor

echauchot commented Oct 25, 2021

R:@echauchot @JingGe thanks a lot for your work ! If I may, I'd like to review this PR as I was the author of the ParquetAvroInputFormat for the older source.

@echauchot Thanks for your interest and effort. It would be great if you would review this PR. Appreciate it. Please be aware of the "draft" of the current PR status. There are a lot information written in the PR description at the beginning that might hopefully give you the background info for the review.

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

@JingGe
Copy link
Contributor Author

JingGe commented Oct 25, 2021

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

@echauchot
Copy link
Contributor

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

I think splitting is mandatory because if you read a big parquet file with no split support, then all the content will end up in a single task manager which will lead to OOM

Copy link
Contributor

@echauchot echauchot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @JingGe Thanks for your work ! I must admit I have mixed feelings about this PR: I feel like it is very java-stream and single-split oriented. I think as Fabian that implementing BulkFormat would be better.

@JingGe
Copy link
Contributor Author

JingGe commented Oct 27, 2021

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

I think splitting is mandatory because if you read a big parquet file with no split support, then all the content will end up in a single task manager which will lead to OOM

I agree with you, I have actually the same concern, especially from the SQL perspective. I didn't really understand your concern about OOM, because on the upper side we can control it via StreamFormat.FETCH_IO_SIZ and on the under side, ParquetFileReader will be used, we will not read the whole parquet file into memory.

The goal of this PR is to make everyone on the same page w.r.t. the implementation design. Once the design is settled down, the splitting support as a feature could be easily done in a follow-up PR. That is why I wrote in the PR description explicitly at the beginning that "Splitting is not supported in this version". I will update it with more background info.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for your contribution. The general approach looks good. There are some things that I'd like to reconsider though:

  • I'm not sure if the answer to the confusing in the 3 Format interfaces should be to add a 4.
  • An ITCase is missing; the current code is not working and will always yield NPE during execution.

@echauchot
Copy link
Contributor

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

I think splitting is mandatory because if you read a big parquet file with no split support, then all the content will end up in a single task manager which will lead to OOM

I agree with you, I have actually the same concern, especially from the SQL perspective. I didn't really understand your concern about OOM, because on the upper side we can control it via StreamFormat.FETCH_IO_SIZ and on the under side, ParquetFileReader will be used, we will not read the whole parquet file into memory.

The goal of this PR is to make everyone on the same page w.r.t. the implementation design. Once the design is settled down, the splitting support as a feature could be easily done in a follow-up PR. That is why I wrote in the PR description explicitly at the beginning that "Splitting is not supported in this version". I will update it with more background info.

Yes I see that there is a countermeasure regarding possible OOM (fetch size) but still, for performance reasons, the split is important. Otherwise the parallelism is sub-optimal and Flink focuses on performance. I'm not a committer on the Flink project so it is not my decision to merge this PR without split but I would tend not to merge without split support to avoid that a user suffers from this lack of performance which seems to not meet project quality standards.

@AHeise WDYT ?

@JingGe JingGe force-pushed the format-parquetavro-genericrecord branch from f9f0630 to 9ae5e4d Compare November 1, 2021 19:00
@AHeise
Copy link
Contributor

AHeise commented Nov 2, 2021

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

I think splitting is mandatory because if you read a big parquet file with no split support, then all the content will end up in a single task manager which will lead to OOM

I agree with you, I have actually the same concern, especially from the SQL perspective. I didn't really understand your concern about OOM, because on the upper side we can control it via StreamFormat.FETCH_IO_SIZ and on the under side, ParquetFileReader will be used, we will not read the whole parquet file into memory.
The goal of this PR is to make everyone on the same page w.r.t. the implementation design. Once the design is settled down, the splitting support as a feature could be easily done in a follow-up PR. That is why I wrote in the PR description explicitly at the beginning that "Splitting is not supported in this version". I will update it with more background info.

Yes I see that there is a countermeasure regarding possible OOM (fetch size) but still, for performance reasons, the split is important. Otherwise the parallelism is sub-optimal and Flink focuses on performance. I'm not a committer on the Flink project so it is not my decision to merge this PR without split but I would tend not to merge without split support to avoid that a user suffers from this lack of performance which seems to not meet project quality standards.

@AHeise WDYT ?

I'm fine with a follow-up ticket/PR on that one to keep things going. Having any support for AvroParquet is better than having none. But it should be done before 1.15 release anyhow, such that end-users see only the splittable version.

We plan to support splitting for all formats with sync marks but in general the role of splitting has shifted, since the whole big data processing moved from block-based storages to cloud storages. Earlier, splitting was also needed to support data locality, which doesn't apply anymore. So now it's only needed to speed up ingestion (you can always rebalance after source), so it is necessary only for the most basic pipelines.

TL;DR while splitting is still a should-have feature, the days of must-have are gone.

@echauchot
Copy link
Contributor

@echauchot

Sure, I saw the comments about split and data types etc... But I feel unconfortable about draft PRs because they usually cannot be merged as is. In the case of your PR, merging it without the split support could not be done. So I guess the correct way to proceed is to use this PR as an environment for design discussions and add extra commits until the PR is ready for prod @JingGe @fapaul WDYT ?

you are right, that was the idea of the draft PR. Speaking of the splitting support specifically, which will make the implementation way more complicated, this PR might be merged without it, because we didn't get any requirement for it from the business side. If you have any strong requirement w.r.t. the splitting, we'd like to know and reconsider it.

I think splitting is mandatory because if you read a big parquet file with no split support, then all the content will end up in a single task manager which will lead to OOM

I agree with you, I have actually the same concern, especially from the SQL perspective. I didn't really understand your concern about OOM, because on the upper side we can control it via StreamFormat.FETCH_IO_SIZ and on the under side, ParquetFileReader will be used, we will not read the whole parquet file into memory.
The goal of this PR is to make everyone on the same page w.r.t. the implementation design. Once the design is settled down, the splitting support as a feature could be easily done in a follow-up PR. That is why I wrote in the PR description explicitly at the beginning that "Splitting is not supported in this version". I will update it with more background info.

Yes I see that there is a countermeasure regarding possible OOM (fetch size) but still, for performance reasons, the split is important. Otherwise the parallelism is sub-optimal and Flink focuses on performance. I'm not a committer on the Flink project so it is not my decision to merge this PR without split but I would tend not to merge without split support to avoid that a user suffers from this lack of performance which seems to not meet project quality standards.
@AHeise WDYT ?

I'm fine with a follow-up ticket/PR on that one to keep things going. Having any support for AvroParquet is better than having none. But it should be done before 1.15 release anyhow, such that end-users see only the splittable version.

We plan to support splitting for all formats with sync marks but in general the role of splitting has shifted, since the whole big data processing moved from block-based storages to cloud storages. Earlier, splitting was also needed to support data locality, which doesn't apply anymore. So now it's only needed to speed up ingestion (you can always rebalance after source), so it is necessary only for the most basic pipelines.

TL;DR while splitting is still a should-have feature, the days of must-have are gone.

Having the split before any user could see the non-split version was all I was interested in. So delivering it before 1.15 release looks perfect !
Thanks a lot Arvid for the info I did not have about the relative importance of splitting, I had only the rebalance in mind !

@JingGe JingGe force-pushed the format-parquetavro-genericrecord branch from b481049 to 8b1a7f9 Compare November 8, 2021 14:57
@JingGe JingGe marked this pull request as ready for review November 15, 2021 08:58
@JingGe JingGe changed the title [Draft][FLINK-21406][RecordFormat] build AvroParquetRecordFormat for the new FileSource [FLINK-21406][RecordFormat] build AvroParquetRecordFormat for the new FileSource Nov 15, 2021
@JingGe
Copy link
Contributor Author

JingGe commented Nov 15, 2021

@flinkbot run azure

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current state look already very good to me.
@StephanEwen asked offline why we are targeting a feature that is specific to DataStream and if we are not repeating the same mistake of the past.

So could you please try and spike what would need to be done to port the solution to the existing ParquetVectorizedInputFormat? I'm a bit skeptical here:

  • We have no concept of vectorized data in DataStream (should we?). Exposing the table types to DataStream is not a good option.
  • On the other hand, integrating AvroParquet into Table also has no value as Table API expects table types.
    Maybe we really found out that for columnar formats, we simply can't share the implementation for DataStream and Table.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current state looks good. I'm wondering if ITCase should also cover specific and reflective records. But it might already be enough to cover with unit test.

I have made a proposal on how to address the unsatisfying recovery + merge it with ParquetVectorizedInputFormat (probably by extracting a common ancestor) PTAL.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably last round of review. Doing the final check after your next push.

@JingGe JingGe force-pushed the format-parquetavro-genericrecord branch 2 times, most recently from b8a96d9 to d3ffea2 Compare December 16, 2021 13:57
@JingGe JingGe force-pushed the format-parquetavro-genericrecord branch 3 times, most recently from 0507c16 to 84e5e70 Compare December 17, 2021 18:31
@JingGe JingGe force-pushed the format-parquetavro-genericrecord branch from 84e5e70 to b8f5219 Compare December 17, 2021 18:33
@JingGe
Copy link
Contributor Author

JingGe commented Dec 17, 2021

@flinkbot run azure

@AHeise AHeise closed this in cdf3d48 Dec 19, 2021
@AHeise AHeise reopened this Dec 19, 2021
Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you very much for enhancing Parquet.

@AHeise AHeise closed this Dec 19, 2021
@AHeise
Copy link
Contributor

AHeise commented Dec 19, 2021

Merged into master as b4ca350..cdf3d48.

niklassemmler pushed a commit to niklassemmler/flink that referenced this pull request Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants