Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3982] Comprehensive schema evolution in flink when read/batch/cow/snapshot #5443

Closed
wants to merge 4 commits into from

Conversation

trushev
Copy link
Contributor

@trushev trushev commented Apr 27, 2022

What is the purpose of the pull request

This PR adds support of reading by flink when comprehensive schema evolution(RFC-33) enabled and there were some operations add column, rename column, change type of column, drop column.
Supported mode: batch/cow/snapshot

Brief change log

  • Added new option to enable comprehensive schema evolution in flink
  • Key changes are made inside CopyOnWriteInputFormat. Now, during the opening, it calculates schema of file, if this schema differs from queried schema, it creates cast map. After reading file, type conversion is performed according to constructed map.

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit test TestCastMap to verify that type conversion is correct
  • Added integration test ITTestReadWithSchemaEvo to verify that table with added, renamed, casted, dropped columns is read as expected. This test uses TestSpark3DDL to prepare data, so it works only with -P scala-2.12,spark3.2, since TestSpark3DDL works only with it.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao
Copy link
Contributor

@danny0405 @bvaradar If you have free time,could you pls help review this pr, thanks very much

@danny0405
Copy link
Contributor

What do you mean when you saying batch/cow/snapshot ?

@trushev
Copy link
Contributor Author

trushev commented Apr 27, 2022

What do you mean when you saying batch/cow/snapshot ?

This PR covers the following case

'read.streaming.enabled' = 'false',
'table.type' = 'COPY_ON_WRITE',
'hoodie.datasource.query.type' = 'snapshot'

@yihua yihua added schema-and-data-types flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs labels Apr 28, 2022
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why introduces the spark dependency in flink pom ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prepare test data. Currently, only Spark engine provides way to change schema and write new data after that.
I think when full support of schema evolution is implemented, we can remove this dependency by rewriting test to pure Flink

@@ -447,6 +453,17 @@ private Schema inferSchemaFromDdl() {
return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
}

private SchemaEvoContext getSchemaEvoContext() {
if (!conf.getBoolean(FlinkOptions.SCHEMA_EVOLUTION_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns Option<SchemaEvoContext> instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. removed enabled field. isPresent means enabled.

LogicalTypeRoot to = toType.getTypeRoot();
switch (to) {
case BIGINT: {
// Integer => Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the philosophy of these mappings ?

Copy link
Contributor Author

@trushev trushev May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume schema evolution DDL

alter table t1 alter column val type bigint

which changes type of val from int to bigint

We want to be able to read old data. To do it we need to cast val from int to long

otherwise, an exception will be thrown

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)

This class is an analogue of org.apache.hudi.client.utils.SparkInternalSchemaConverter#convertColumnVectorType which converts Spark's types

* Data class to pass schema evolution info from table source to input format.
*/
public final class SchemaEvoContext implements Serializable {
private final boolean enabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this clazz necessary ? The enabled flag can be replaced by Option< querySchema> non empty instead.

Copy link
Contributor Author

@trushev trushev May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this clazz necessary ?

I think yes. Presented schema evolution methods moved from CopyOnWriteInputFormat to SchemaEvoContext to be reused in MergeOnReadInputFormat

The enabled flag can be replaced by Option< querySchema> non empty instead.

fixed by Option<SchemaEvoContext>

}

private static final class ActualFields {
private final String[] names;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally i don't like the style that we introduces too many intermediate POJOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pojo removed

@@ -61,7 +68,10 @@ public static RowDataProjection instance(LogicalType[] types, int[] positions) {
public RowData project(RowData rowData) {
GenericRowData genericRowData = new GenericRowData(this.fieldGetters.length);
for (int i = 0; i < this.fieldGetters.length; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do not affect the normal code path for non evolution ? Something like

public RowData project(RowData rowData, CastMap castMap)

Copy link
Contributor Author

@trushev trushev May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Faire enough. Fixed. I extended RowDataProjection instead of project(RowData rowData, CastMap castMap) because it is convenient to keep the CastMap inside the projection

@trushev
Copy link
Contributor Author

trushev commented May 17, 2022

@hudi-bot run azure

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@trushev trushev closed this Jun 10, 2022
@trushev
Copy link
Contributor Author

trushev commented Jun 10, 2022

I merged all supported modes into one patch and reworked pull request.
#5830

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink priority:major degraded perf; unable to move forward; potential bugs schema-and-data-types
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants