Skip to content

[HUDI-6474] Added support for reading tables evolved using comprehensive schema evolution on Flink#9133

Merged
danny0405 merged 9 commits intoapache:masterfrom
voonhous:HUDI-6474
Jul 20, 2023
Merged

[HUDI-6474] Added support for reading tables evolved using comprehensive schema evolution on Flink#9133
danny0405 merged 9 commits intoapache:masterfrom
voonhous:HUDI-6474

Conversation

@voonhous
Copy link
Member

@voonhous voonhous commented Jul 6, 2023

…volution on Flink

PR is co-authored by @hbgstc123.

The current Hudi comprehensive Schema evolution for Flink Reads do not support complex types for:

  1. STRUCT
  2. MAP
  3. ARRAY

This PR is basically supplementing this feature to reduce the feature gap between what Spark<>Flink can support.

Change Logs

Added support for reading tables that were evolved using Hudi's comprehensive/full schema evolution

Impact

None

Risk level (write none, low medium or high below)

None

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@voonhous voonhous changed the title Added support for reading tables evolved using comprehensive schema e… [HUDI-6474] Added support for reading tables evolved using comprehensive schema e… Jul 6, 2023
…ive schema evolution on Flink

Co-authored-by: hbgstc123 <hbgstc123@gmail.com>
@voonhous
Copy link
Member Author

voonhous commented Jul 7, 2023

@danny0405 Can you please help to review this?

Background:
The current Hudi comprehensive Schema evolution for Flink Reads do not support complex types for:

  1. STRUCT
  2. MAP
  3. ARRAY

This PR is basically supplementing this feature to reduce the feature gap between what Spark<>Flink can support.

Thanks

default:
}
return null;
throw new IllegalArgumentException(String.format("Unsupported conversion for %s => %s", fromType, toType));
Copy link
Contributor

@danny0405 danny0405 Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not throw RuntimeException in nested calling code path, it is very obscure for the invoker to get the perception of exceptions. Either throws a checked exception or return null as of before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified them to be checked exceptions, not sure if i did it correctly, please take a look, thank you.

Object[] objects = new Object[array.size()];
for (int i = 0; i < array.size(); i++) {
Object fromObject = ArrayData.createElementGetter(fromType).getElementOrNull(array, i);
// need to handle nulls to prevent NullPointerException in #getConversion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create element getter for each step of the for-loop?

final Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < map.size(); i++) {
Object keyObject = ArrayData.createElementGetter(keyType).getElementOrNull(map.keyArray(), i);
Object fromObject = ArrayData.createElementGetter(fromValueType).getElementOrNull(map.valueArray(), i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// note: InternalSchema.merge guarantees that the schema to be read fromType is orientated in the same order as toType
// hence, we can match types by position as it is guaranteed that it is referencing the same field
List<LogicalType> fromChildren = fromType.getChildren();
List<LogicalType> toChildren = toType.getChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

GenericRowData rowData = new GenericRowData(toType.getChildren().size());
for (int i = 0; i < toChildren.size(); i++) {
Object fromVal = RowData.createFieldGetter(fromChildren.get(i), i).getFieldOrNull(row);
Object toVal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

caution for the performance, because you are constructing the field getter for each row->row conversion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions on how we can work around this? Don't think this is avoidable as of now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a wrapper class like what we to in RowDataToAvroConverters for row data and avro conversion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, please take a look.

I also added a check in the testCastNestedRow case in TestCastMap to test INT -> INT NOT NULL conversions as I noticed we are not handling such cases in the past.

@voonhous
Copy link
Member Author

We might need to update the checkstyle plugin from 3.0.0 to 3.1.0 due to this bug:

https://issues.apache.org/jira/browse/MCHECKSTYLE-347

I will submit a PR for this.

@voonhous
Copy link
Member Author

@danny0405 Can you please help to take a look at this, thank you!

@danny0405
Copy link
Contributor

6474.patch.zip
Thanks for the contribution @voonhous , basically looks good, just some refactoring for the naming.

@danny0405 danny0405 self-assigned this Jul 19, 2023
@danny0405 danny0405 added engine:flink Flink integration schema-evolution area:schema Schema evolution and data types labels Jul 19, 2023
@voonhous
Copy link
Member Author

6474.patch.zip Thanks for the contribution @voonhous , basically looks good, just some refactoring for the naming.

@danny0405 Done!

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 changed the title [HUDI-6474] Added support for reading tables evolved using comprehensive schema e… [HUDI-6474] Added support for reading tables evolved using comprehensive schema evolution on Flink Jul 20, 2023
@danny0405 danny0405 merged commit 0952441 into apache:master Jul 20, 2023
@voonhous voonhous deleted the HUDI-6474 branch December 7, 2023 06:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:schema Schema evolution and data types engine:flink Flink integration

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

3 participants