Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink SQL with Iceberg snapshots doesn't react if table has upsert #9948

Open
VidakM opened this issue Mar 13, 2024 · 3 comments
Open

Flink SQL with Iceberg snapshots doesn't react if table has upsert #9948

VidakM opened this issue Mar 13, 2024 · 3 comments
Labels
question Further information is requested

Comments

@VidakM
Copy link

VidakM commented Mar 13, 2024

Query engine

Flink 1.17.2 with Iceberg 1.4.2 libraries

Question

I have a few Iceberg v2 tables defined and a Flink job that reads them in a streaming fashion before transforming to another Iceberg table.

If the source tables are basic, then subscribing to them works great and the SQL query can continuously run.

But if the tables are defined with 'write.upsert.enabled'='true', then the subscribing Flink SQL will read only once, and not react to new snapshots. Even if the SQL definition asks it to monitor intervals and the streaming strategy is any incremental version.

Flink streaming query that normally works:

INSERT INTO iceberg.target_packaging
SELECT
  usr.`user_id` AS `user_id`,
  usr.`adress` AS `address`,
  ord.`item_id` AS `item_id`,
  ....
FROM
  iceberg.source_users /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ usr
JOIN
  iceberg.source_orders /*+  OPTIONS('streaming'='true', 'monitor-interval'='15s')  */ ord ON usr.`user_id` = ord.`user_id`;

The streaming join works great if the source Iceberg tables are defined like this:

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2');

Resulting table properties example: 
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]

But the streaming join runs only once, and then stops triggering on new snapshots. It does not finish though, just stops reacting from source and produces no new records.

CREATE TABLE iceberg.source_users (
  `user_id` STRING,
  `adress` STRING,
  ....
  PRIMARY KEY (`user_id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');

Resulting table properties example: 
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]

In my Flink job i simply define the connector and run the SQL join/insert. Both source and target table is already defined.

I also noticed that If I have an SQL Join, it too stops streaming if at least one table has upsert enabled.

Looking at the documentation for both Iceberg and Flink I don't find any indication that enabling upsert should alter the behaviour - but I do remember reading somewhere that FLIP27 only supports append and not update / delete. Is this the reason I'm seeing this behaviour?

@VidakM VidakM added the question Further information is requested label Mar 13, 2024
@VidakM VidakM changed the title Flink SQl with Iceberg snapshots doesn't react if table has upsert Flink SQL with Iceberg snapshots doesn't react if table has upsert Mar 13, 2024
@pvary
Copy link
Contributor

pvary commented Mar 13, 2024

Flink streaming read only supports append only tables ATM.

When you imsert to a table it generates appedds, but when you switch to upsert mode, you start adding update changes to the table. Appends are read in streaming mode, but updates are ignored, as we do not have the capability to correctly emit the delete records

@VidakM
Copy link
Author

VidakM commented Mar 14, 2024

Thanks for getting back to me.

Looking at the docs I also see that Spark Structured Streaming has a similar warning of only supporting appends.
A bit too bad, as it would unlock a lot of interesting solutions.

Is Flink and Spark append+upsert (delete) streaming not something you want to support due to some different preferred way or is it just later in the roadmap? If so any idea on timeline?

For us this would unlock plenty of interesting use-cases for Iceberg.
Atm we dump raw events into Iceberg for replay storage, with Flink we also unpack them and tidy them a little bit. Call it a silver layer if you feel like.

But if we could subscribe to the more complex upsert silver writes (late arrival joins, aggregations, pivots, partial GDPR deletes), we could actually automate pipelines that maintain more “golden” tables. For data science teams to gain clean generated and maintained tables, SuperSet users better views, easier to stream transformed data from Iceberg to other products etc. Now we would perhaps need to use Kafka or something as a buffer and do dual writes to gain CDC for upsert.

It would also circumvent some of the Flink limitations, such as not needing to buffer in memory as much for late arrivals, as it lacks some good transformation SQL that Spark has.

We would happily help contribute if possible and given a few pointers.
I suppose this PR is the one implementing the upsert compatible deletes?
#3095

I also saw Flink implemented FLIP-282 in 1.17 release, to enable row level delete / update APIs

@pvary
Copy link
Contributor

pvary commented Mar 15, 2024

@VidakM: I suggest to bring this up on the mailing list. I would also love to see this feature in Flink.

I checked myself a few months ago, and the main blocker is that the planning part for streaming V2 tables is not implemented yet. The interfaces are created, but there is no working implementation for planning. Also the proposed interfaces have a base class different than the one returned by the V1 planning, which will make it harder to do the implementation part on the execution engine side.

Seeing these tasks, I started to work on the Flink in-job compaction, as it have more immediate gains for us, but in the medium term (in a year), I would like to revisit this feature. (No commitment though)

If you need reviewers, then I could help there, so any contribution would be welcome here!

Thanks,
Peter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants