Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions related to watermarking of Iceberg source #9138

Closed
jonathf opened this issue Nov 23, 2023 · 11 comments
Closed

Questions related to watermarking of Iceberg source #9138

jonathf opened this issue Nov 23, 2023 · 11 comments
Labels

Comments

@jonathf
Copy link

jonathf commented Nov 23, 2023

Query engine

Flink 1.15.2

Question

@pvary, congratulations on getting #8553 merged! It has benn interesting to follow the progress to the PR.

I know that it is still going to take a little time before the next release, but I have some some practical question for when it is released:

  1. Which versions of Flink will this support? Or is it agnostic to the flink version?
  2. Is this code required both client and server side to work? In other words, is it enough to update the iceberg dependency in the flink application to the newest version (when it is ready), or do we also have to update the iceberg storage code aswell?
  3. Is using this feature as simple as assigning the watermark column in the table api, or do we have to do anything more to get it to work.

I apologies if I am too quick on the trigger and this answers will be answered in e.g. the docs soon.

@pvary
Copy link
Contributor

pvary commented Nov 23, 2023

@jonathf: Always good to know if somebody is interested in a feature, so feel free to ask your questions!

  1. Which versions of Flink will this support? Or is it agnostic to the flink version?

I will create a backport PR soon, which will add the feature to Flink 1.15, and Flink 1.16 too.
That said, the next task on my list is #8930, which will enable Flink 1.18 support, but it will also drop 1.15 support. So I expect that in the next release of Iceberg the feature will be supported for Flink 1.16, 1.17 and 1.18

  1. Is this code required both client and server side to work? In other words, is it enough to update the iceberg dependency in the flink application to the newest version (when it is ready), or do we also have to update the iceberg storage code aswell?

The only server side of Iceberg is the REST Catalog implementation, which are not affected by this change (and you might use a different Catalog anyway). So Flink dependency upgrade is enough.

  1. Is using this feature as simple as assigning the watermark column in the table api, or do we have to do anything more to get it to work.

You need column metrics for your tables, so please make sure that the file statistics are there for the given column (you might need to rewrite the files, if they are missing). If the metrics are there, then just enable the watermark generation using watermarkColumn, and you are ready. If your column type is long, you might want to use watermarkTimeUnit to define which time unit is used to store your event time.

If you have missing statistics, you will get the following exception:

Missing statistics for column name = %s in file = %s

@jonathf
Copy link
Author

jonathf commented Nov 23, 2023

@jonathf: Always good to know if somebody is interested in a feature, so feel free to ask your questions!

I am glad to hear that! And thank you for the swift and in-depth answer.

[...] I expect that in the next release of Iceberg the feature will be supported for Flink 1.16, 1.17 and 1.18

We are on AWS managed Flink application which is currently limited upward to 1.15. Do you know anything about if or when AWS will update their support?

The only server side of Iceberg is the REST Catalog implementation, which are not affected by this change (and you might use a different Catalog anyway). So Flink dependency upgrade is enough.

Good to know.

You need column metrics for your tables, so please make sure that the file statistics are there for the given column (you might need to rewrite the files, if they are missing). If the metrics are there, then just enable the watermark generation using watermarkColumn, and you are ready. If your column type is long, you might want to use watermarkTimeUnit to define which time unit is used to store your event time.

If you have missing statistics, you will get the following exception:

Missing statistics for column name = %s in file = %s

Thank you. I am not familiar with column metrics in flink. Is there documentation on the topic that you can point me to so I may read up on the topic?

@pvary
Copy link
Contributor

pvary commented Nov 23, 2023

#9139 is the backport PR.

We are on AWS managed Flink application which is currently limited upward to 1.15. Do you know anything about if or when AWS will update their support?

Sadly, I have no information about this. I would ask on the Iceberg user list or Slack channel, as there are several AWS maintainers in the community.

Thank you. I am not familiar with column metrics in flink. Is there documentation on the topic that you can point me to so I may read up on the topic?

These are not Flink column metrics. They are Iceberg column metrics. By default, the metrics collection is turned on, but it could be fine tuned by these configs:

  • write.metadata.metrics.max-inferred-column-defaults
  • write.metadata.metrics.default
  • write.metadata.metrics.column.col1

See: https://iceberg.apache.org/docs/latest/configuration/

@jonathf
Copy link
Author

jonathf commented Nov 23, 2023

Okay, understood.

I misunderstood the column metrics. Looks like metrics are off on our end, but seems like they can easily be added through AWS Glue. We will definetly give it a try (with a full rewrite).

Our watermark column are of type timestamp inhereted from kafka through flink before outputed to iceberg. Can I assume this is is interpreted correctly?

@nastra nastra added the flink label Nov 23, 2023
@pvary
Copy link
Contributor

pvary commented Nov 23, 2023

Our watermark column are of type timestamp inhereted from kafka through flink before outputed to iceberg. Can I assume this is is interpreted correctly?

Yes, that's the plan

@jonathf
Copy link
Author

jonathf commented Nov 23, 2023

Great!

So to ensure I understand you correctly:
You are working on 1.15 now, but likely it will not reach the next release. So the only way for us to get this feature is if we compile it ourself after it is merged? Or are you noting that it is likely that if the backporting takes to long it won't reach us at all?

@pvary
Copy link
Contributor

pvary commented Nov 24, 2023

Once #9139 is merged, you can use it to compile your own version of iceberg-flink-runtime, but officially it will not be supported by the community, because 1.15 support of this feature likely will not be released ever.

@pvary
Copy link
Contributor

pvary commented Nov 28, 2023

@jonathf: Do you have any more questions? Could we close this issue?
Thanks,
Peter

@jonathf
Copy link
Author

jonathf commented Nov 28, 2023

One last question:
you noted that the iceberg slack channel. How do I get access to it?

@pvary
Copy link
Contributor

pvary commented Nov 28, 2023

There is a link on the web page: https://iceberg.apache.org/ which is hard to miss 😄
Screenshot 2023-11-28 at 10 21 24

@jonathf
Copy link
Author

jonathf commented Nov 28, 2023

Heh, I tried googling it, and it sent me in the wrong direction.

Thanks for all your help @pvary.

@jonathf jonathf closed this as completed Nov 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants