-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-7622] Optimize HoodieTableSource's sanity check #11031
base: master
Are you sure you want to change the base?
Conversation
.toArray(); | ||
if (!expColumns.isEmpty()) { | ||
throw new HoodieException("Column(s) " + String.join(", ", expColumns) + " does not exists in the hudi table " + this.tableName + "."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column(s) [$col_a
, $col_b
, $col_c
...] does not exist in the table $tableName
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
...asource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
Outdated
Show resolved
Hide resolved
5ef17f7
to
a7270a9
Compare
a487acf
to
0b3c631
Compare
@@ -86,12 +84,14 @@ public DynamicTableSource createDynamicTableSource(Context context) { | |||
setupTableOptions(conf.getString(FlinkOptions.PATH), conf); | |||
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); | |||
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); | |||
return new HoodieTableSource( | |||
HoodieTableSource source = new HoodieTableSource( | |||
schema, | |||
path, | |||
context.getCatalogTable().getPartitionKeys(), | |||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the sanity check in the HoodieTableFactory
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If according to what you said, Metadataclient needs to be initialized in the factory(Hoodie source sanity check need). It seems more reasonable to initialize in source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadataclient needs to be initialized in the factory
That's okay, we already do that for the sink sanity check of table config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 cc
e667c4b
to
20af652
Compare
@@ -518,7 +499,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( | |||
tableAvroSchema.toString(), | |||
AvroSchemaConverter.convertToSchema(requiredRowType).toString(), | |||
inputSplits, | |||
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); | |||
OptionsResolver.getRecordKeyField(conf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change is way too complicated, can you re-illustrate the issue again? What is the use case from user and what the correct behavior is expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The table created by the upstream write (recorded in the existing metadata) do not match the columns configured by the downstream stream read. For example, some columns do not exist, resulting in the columns not be found.
-> Verification failed, throwing exception - The recordkey configuration does not exist
-> Verification failed, throwing exception - Case problem. The columns created based on calsite in the upstream are all lowercase. If there are uppercase in the downstream, such as "eventTime", the columns will not be found.
->Uniformly converted to lowercase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case problem. The columns created based on calsite in the upstream are all lowercase. If there are uppercase in the downstream, such as "eventTime", the columns will not be found.
->Uniformly converted to lowercase
This is not expected to be handled by Hudi, I think. At least, from the catalog layer, we should make the case-sensitivity agnostic to specific engines.
The table created by the upstream write (recorded in the existing metadata) do not match the columns configured by the do
In HoodieTableFactory#createDynamicTableSource
, add a sanity check for the catalog table resolved schema and the existing Hudi table schema, that should be enough I guess. Similiar with the primary key definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 cc~
20af652
to
e10e529
Compare
e10e529
to
b3ed3ad
Compare
Thanks for the contribuiton, here is a patch for the fix: It would be greate if you can help to add some UTs. |
14a352d
to
a7ab936
Compare
Made some adjustments:
|
/** | ||
* Utilities for HoodieTableFactory sanity check. | ||
*/ | ||
public class SanityCheckUtil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename class name to SanityChecks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename class name to
SanityChecks
.
Done.
return new HoodieTableSource( | ||
schema, | ||
path, | ||
context.getCatalogTable().getPartitionKeys(), | ||
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME), | ||
conf); | ||
conf, | ||
metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metaClient initialization is costly but the table source only happens once in Job Graph compile time, let's not reuse here to reduce complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metaClient initialization is costly but the table source only happens once in Job Graph compile time, let's not reuse here to reduce complexity.
Done.
f0936bc
to
30f50eb
Compare
@@ -86,6 +86,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { | |||
setupTableOptions(conf.getString(FlinkOptions.PATH), conf); | |||
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); | |||
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); | |||
HoodieTableMetaClient metaClient = StreamerUtil.metaClientForReader(conf, HadoopConfigurations.getHadoopConf(conf)); | |||
SanityChecksUtil.sanitCheck(conf, schema, metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the instantiation of meta client into the SanityCheckUtil. And can we rename the class to SanityChecks
checkTableType(conf); | ||
List<String> schemaFields = schema.getColumnNames(); | ||
if (metaClient != null) { | ||
if (checkMetaData) { | ||
HoodieTableMetaClient metaClient = StreamerUtil.metaClientForReader(conf, HadoopConfigurations.getHadoopConf(conf)); | ||
List<String> latestTablefields = StreamerUtil.getLatestTableFields(metaClient); | ||
if (latestTablefields != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for sink has been changed with this patch. We have this code for the original sink:
if (!OptionsResolver.isAppendMode(conf)) {
checkRecordKey(conf, schema);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put this logic into function checkRecordKey, and both the source and sink need to be checked.
public static void checkRecordKey(Configuration conf,List<String> existingFields) {
if (OptionsResolver.isAppendMode(conf)) {
return;
}
....
}
And also do this in function checkIndexType.
public static void checkIndexType(Configuration conf) {
if (OptionsResolver.isAppendMode(conf)) {
return;
}
....
}
setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); | ||
setupSortOptions(conf, context.getConfiguration()); | ||
SanityChecks.sanitCheck(conf, schema, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should move the line before 105.
Change Logs
Existing exceptions cannot know which table and which specific columns the exception is.
Modify as follows:
check columns
check pks
Impact
Describe any public API or user-facing feature change or any performance impact.
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist