Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: support watermark and computed columns #4625

Closed
wants to merge 4 commits into from

Conversation

wuwenchi
Copy link
Contributor

We hope iceberg can support watermark and computed columns.
The specific implementation details are as follows:

Background and Motivation

There is a temporal table in iceberg, which needs to be used for real-time join with other tables, so a watermark needs to be defined. However, the watermark has requirements for the attributes of the fields, and the fields representing time in the source table may not meet this requirement, so it is necessary to convert the time fields of the source table by recalculating or calling functions.

Why not directly use the iceberg connector table supported by flink?

  1. All flink tables are stored in memory. When the task ends, the table disappears. If you want to use it next time, you need to create it again.
  2. In the scenes we use, a temporal table needs to be joined with multiple tables, and the attributes of the watermark are only related to the temporal table, so the watermark attributes of these tasks are consistent, so we hope to directly associate these attributes with Temporal table binding, when you use this table again in the future, you don't need to go to other tasks to check the watermark attribute setting parameters of the table.

Goal

  1. Iceberg supports flink-sql to set watermark and computed columns.
  2. The watermark attribute and computed column attribute can be modified by modifying the table properties.

Proposal

Table property save format

Example

CREATE TABLE tl (
  id INT, 
  id2 AS id * 2, 
  f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
  t1 TIMESTAMP(6), 
  t2 AS cast(t1 AS TIMESTAMP(3)), 
  watermark FOR t2 AS t2 - INTERVAL '5' SECOND
);

Then the table properties are saved as:

flink.computed-columns.id2 = `id` * 2
flink.computed-columns.f1 = TO_TIMESTAMP(FROM_UNIXTIME(`id` * 3))
flink.computed-columns.t2 = CAST(`t1` AS TIMESTAMP(3))
flink.watermark.t2 = `t2` - INTERVAL '5' SECOND

key format

fixed prefix + field name:

  • fixed prefix for watermark: flink.watermark.
  • fixed prefix for computed columns: flink.computed-columns.

value format

defined expression from user.

The way of addition, deletion and modification

1、DDL of flink-sql, only supports adding

CREATE TABLE `hive_catalog`.`default`.`sample` (
  id INT, 
  id2 AS id * 2, 
  f1 AS TO_TIMESTAMP(FROM_UNIXTIME(id*3)),
  t1 TIMESTAMP(6), 
  t2 AS cast(t1 AS TIMESTAMP(3)), 
  watermark FOR t2 AS t2 - INTERVAL '5' SECOND
);

2、Syntax of table properties

  • add or update
ALTER TABLE `hive_catalog`.`default`.`sample` SET (
	'flink.computed-columns.id2'='id*3'
)
  • delete
ALTER TABLE `hive_catalog`.`default`.`sample` RESET (
	'flink.computed-columns.id2'
)

Solution

add table process

  1. If there is a defined computed column in the table, save its expression to the table property.
  2. If there is a defined watermark in the table, save its expression to the table property.

alter table properties process

  1. Merge the modified properties with the original properties to get the updated properties of the table.

  2. Generate the table of flink from the merged table properties and the schema of the original table, and verify the schema. (To prevent errors in expression, such as writing a non-existing function or column name in the expression of the computed column.)

  3. If the verification in the previous step is successful, submit this property modification,
    if not, report exception and do not make the final submission.

get table process

  1. Generate the flink table by combining the current table properties with the table schema, and verify the schema. (To prevent the schema of the table from being modified by other engines, resulting in an error in the expression of the calculated column. For example, in the expression of the computed column: id * 3, but then the id column was deleted using spark, and the corresponding property for computed column was not deleted.)

  2. If the verification is successful, the table is returned.
    If it is unsuccessful, the computed columns and watermarks in the table properties are ignored, and the original table physical schema is returned directly.

@github-actions github-actions bot added the flink label Apr 25, 2022
@wuwenchi
Copy link
Contributor Author

wuwenchi commented Apr 25, 2022

relevant pr: #2265, #3681
Could you help review it? Thanks! @openinx @rdblue @hameizi @zhangjun0x01 @stevenzwu

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this big undertaking @wuwenchi!

I believe Flink 1.15 was released, so I am going to rebase my Flink 1.15 support PR to the release branch and we should have 1.15 support in the next few days.

Just a heads up that anything done here will eventually be required to work with that. But I’d wait to get more feedback, given the large scope of these features.

Happy to see the interest in this!

BTW - Have you given any thought to possibly trying to support tables with hidden partitioning (that use partition transforms other than identity transform) via generated columns? Likely out of scope, but that’s one way I’ve been considering to add support.

@wuwenchi
Copy link
Contributor Author

wuwenchi commented Apr 26, 2022

@kbendick Thanks for your reply!

I believe Flink 1.15 was released, so I am going to rebase my Flink 1.15 support PR to the release branch and we should have 1.15 support in the next few days.

Is this feature already supported on the flink 1.15 branch you are working on now? Or should we wait until flink1.15 is branched out and then support this function through this PR?

Have you given any thought to possibly trying to support tables with hidden partitioning (that use partition transforms other than identity transform) via generated columns? Likely out of scope, but that’s one way I’ve been considering to add support.

Yes, this feature is very useful to me, and the next plan is to support this feature. If you don't mind, can I join this feature?

@Zhangg7723
Copy link
Contributor

nice job

@wuwenchi
Copy link
Contributor Author

wuwenchi commented May 1, 2022

@kbendick Can we consider this way to implement partition transforms #4251 @yittg ?

@stevenzwu
Copy link
Contributor

I have a uber question regarding storing those properties into Iceberg table properties.

Watermark config is very job specific. Different Flink jobs may have different watermark assignment.

flink.watermark.t2 = `t2` - INTERVAL '5' SECOND

There are some discussions in PR #3681

@wuwenchi
Copy link
Contributor Author

wuwenchi commented May 2, 2022

@stevenzwu Thanks for your reply!

We have also considered this issue.
But in fact, when our users are using, more scenarios are that a table needs to be joined with many other tables, and each association may be a separate task, so the watermak attributes of these tasks are actually the same.
If we don't have this feature, users have to look at the watermark property of other tasks to create a new task.

So my thoughts are: For the same table

  1. If you need different watermark attributes, you can use the flink connector to achieve it, create a connector for each task, and then specify different watermarks.
  2. If you need the same watermark attributes, you can use the iceberg table directly.

Copy link
Contributor

@hililiwei hililiwei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job. This feature is useful for a lot of work, and I think we should put some thought into 'Schema' and 'ResoveSchema'. I look forward to more feature support after we finish discussion on 'ResoveSchema' and etc.

@@ -52,35 +63,69 @@
*/
public class FlinkSchemaUtil {

public static final String FLINK_PREFIX = "flink.";

public static final String COMPUTED_COLUMNS = "computed-columns.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use computed-column ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, 's' is redundant, I will fix it.

Comment on lines 274 to 278
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
CatalogManager catalogManager = ((TableEnvironmentImpl) streamTableEnvironment).getCatalogManager();
SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
return table.getUnresolvedSchema().resolve(schemaResolver);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
CatalogManager catalogManager = ((TableEnvironmentImpl) streamTableEnvironment).getCatalogManager();
SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
return table.getUnresolvedSchema().resolve(schemaResolver);
Configuration configuration = ExecutionEnvironment.getExecutionEnvironment().getConfiguration();
TableEnvironment tableEnvironment = TableEnvironment.create(configuration);
Preconditions.checkArgument(table.getDescription().isPresent(), "Illegal table.");
return tableEnvironment.from(table.getDescription().get()).getResolvedSchema();

Based on your comment(#4246 (comment)), I took a look at our previous proposal and made some suggestions, but it may not be optimal.
Here, I think using 'TableEnvironment' directly is a better choice. It has better commonality, whether streaming, batch, or otherwise, and hides underlying differences.
Let's see if somebody else has a better solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is indeed better!
But the description is taken from the table's comment:

    @Override
    public Optional<String> getDescription() {
        return Optional.of(getComment());
    }

The comment does not necessarily exist, and if it exists, it is not necessarily the path of the table, so tableEnvironment.from may not be able to get the table...

Copy link
Contributor

@hililiwei hililiwei May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, so if we go this way, well, we might need to add a comment here.

// NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer may use
// CatalogTableImpl to copy a new catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
return CatalogTable.of(schema, null, partitionKeys, table.properties());

Alternatively, we can get the table via Path. It should be in ObjectPath.
TableEnvironment#Table from(String path);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tableEnvironment here is a newly created environment with only default_catalog and default_database, but this table actually exists in the catalog in the flink environment, so I think tableEnvironment.from should not find the required table. I also actually tested it and couldn't find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky. I'll debug it locally to see if there's a better way.

env.getConfig().getConfiguration().set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
env.getConfig().getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false)
.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good

@@ -374,7 +378,7 @@ void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
throws CatalogException, TableAlreadyExistException {
validateFlinkTable(table);

Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
Schema icebergSchema = FlinkSchemaUtil.convert(((ResolvedCatalogTable) table).getResolvedSchema());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it always safe to type cast to ResolvedCatalogTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in this interface, we always get ResolvedCatalogTable

@Zhangg7723
Copy link
Contributor

nice pr

2. create environment according to executionEnvironment
Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wuwenchi for working on this!

I too would like to see watermark support and computed column support, particularly so that we can have partition transforms.

I have some concern, like @stevenzwu mentioned, about using table properties for things that might be very job-specific.

For example, I know that @chenjunjiedada has recommended that we don’t use the write.upsert-enabled config and instead set that per job. While we can’t immediately deprecate that configuration (and may never), I agree with his advice and I’d be hesitant to add more table properties that should be job-level properties.

My other concern is using flink. in table property names. While it might be the case today that these features are only supported by Flink (such as write.upsert-enabled configuration has been), we should ideally name things without regard for any specific engine as hopefully those engines will support such features in the future (or we can help add support for them).

Overall, I do really want to commend you on a job well done for this. I think it likely needs some changes, and might benefit from some kind of design discussion first, but truly thank you for making this PR as I’m sure much of this code will likely be useful for however this winds up getting designed (be it this way or another). And if this code is used it should definitely be credited.

I’m interested in exploring some of @yittg’s ideas from #4251 and combining some of them with your ideas, such as this updated idea of yours here #5000

@hililiwei
Copy link
Contributor

As for computed columns, we try to support it in Iceberg itself, so that not only Flink, but other engines can also use it, but this change is quite large, just for reference #4994.

Copy link

github-actions bot commented Aug 8, 2024

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 8, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants