Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2560] introduce id_based schema to support full schema evolution. #3808

Closed
wants to merge 9 commits into from

Conversation

xiarixiaoyao
Copy link
Contributor

Tips

What is the purpose of the pull request

Introduce id_based schema to support full schema evolution.
this pr is split from [RFC-33] [HUDI-2429][WIP] Full schema evolution #3668, since that pr is too large.

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

UT added.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar @codope . could you help me to review those codes, thanks.
those codes are split from #3668. and addressed all comments.

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @codope

@xiarixiaoyao : Went through initial few files and added review comments. Will review the rest in a day.

* @param position col position to be added
* @param positionType col position change type. now support three change types: first/after/before
*/
public void addCol(String colName, Schema schema, String doc, String position, TableChange.ColumnPositionChange.ColumnPositionType positionType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: addCol -> addColumns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A question for all these schema methods at this class level -> what is the behavior if the same operation is applied twice. For example what is the behavior if the same column is deleted or added ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second deleted or added will be failed and throw exception。 Before actual operation We will check the column operation。 i will add test case , thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add repeat delete and add case check in test class TestTableChanges.

}

public void addCol(String colName, Schema schema) {
addCol(colName, schema, null, null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of null column type, Can you create an enum value "NULL_COLUMN" in ColumnPositionType ?

Also, why is the param position has type string instead of int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because such operations as hive / spark / MySQL use string as a parameter。 eg: alter table xxx add columns(name string after/before id . i think it will be better to use String as param

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using enum value "NO_OPERATION" instead of use "NULL_COLUMN" in ColumnPositionType

TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
Schema schema = AvroInternalSchemaConverter.convert(newSchema, config.getTableName());
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new operation type for schema changes : "ALTER_SCHEMA"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of reading internal metadata from commit file, can we read it from the .hoodie/.schema folder (using FileBaseInternalSchemasManager).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel OK。 Just a little performance worried, as historySchema will gradually increase, read from commit file has better performance than read from FileBaseInternalSchemasManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save it as an indexed file (HFile) so you can just read the last record or first record and be done? Having one source of truth would be good. We can also do this as follow up

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar Thank you for your review, will update the code today。

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar thanks for your review. update the code.
only a little doubt, reading internal metadata from commit file have a better performnace than read from FileBaseInternalSchemasManager. of course if you think read from FileBaseInternalSchemasManager is better i will update the code

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao : Few more comments.

}
}

private void cleanOldFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do the cleanup as part of archiving commit metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
});
// clean old files, keep at most ten schema files
if (validateSchemaFiles.size() > 10) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this value 10 configurable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}
}

private List getValidateCommits() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename to getValidInstants

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

import java.util.TreeMap;
import java.util.stream.Collectors;

public class FileBaseInternalSchemasManager extends InternalSchemasManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to FileBasedInternalSchemaStorageManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

Path saveTempPath = new Path(baseSchemaPath, instantTime + java.util.UUID.randomUUID().toString());
try {
cleanOldFiles();
byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had earlier removed usage of renames when handling state transitions because renames are not atomic in all storage types. why do we need to write temp file here ? Can we avoid using renames and instead use same state transitions like commits - e.g <instant.requested> <instant.inflight> and <instant.commit>. This way, we can reuse HoodieTimeline logic for scanning and constructing valid commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will redo this logcial, thanks


import org.apache.hudi.common.util.Option;

abstract class InternalSchemasManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to AbstractInternalSchemaStorageManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

}
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> helper = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename helper to recordBuffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are resolving the schema by name. How will rename work here? Is it because in subsequent diff, you will be addressing it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question。
this function must be used together with InternalSchemaUtils.mergeSchema。InternalSchemaUtils.mergeSchema will resolve the rename.
see the code AbstractHoodieLogRecordScanner.processDataBlock from [RFC-33] [HUDI-2429][WIP] Full schema evolution
#3668. and the test case also prove that we can handle the rename operation

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar thanks for your review. i will try address all comments next few days

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar Sorry for not updating the code in time,
update the code, address all comments。
1) introduce a new action(save_schema) to use <instant.requested> <instant.inflight> and <instant.commit> to manage the persistence of schema information
2) move cleanup as part of archiving commit metadata to clean old schema commit
3) Every time we persist the schema, we try to clean up the garbage file

#3668 will updated also later to verify the correctness of this modification。

@xiarixiaoyao
Copy link
Contributor Author

@hudi-bot run azure

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar already rebase the code and addressed all comments. could you pls help me review those code again. thanks

@vinothchandar vinothchandar moved this from Ready for Review to Under Discussion PRs in PR Tracker Board Dec 15, 2021
@xiarixiaoyao
Copy link
Contributor Author

hello @bvaradar @vinothchandar
We are currently testing this feature on a large scale, and I will fix all the bugs found in the test。
hope you wait patiently, thanks

@bvaradar
Copy link
Contributor

bvaradar commented Dec 17, 2021 via email

@bvaradar
Copy link
Contributor

@xiarixiaoyao : Any updates on the PR.

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar @vinothchandar
Sorry to have kept you waiting so long.
We are currently working on the support of Flink and hive for this feature.

Current progress:
Spark: has been completed, and now in large-scale testing.
Hive: expected to finish this weekend.
Flink: It must be completed by February 10 at the latest.
presto: Since the community has proposed a new Presto connector, we cannot adapt it until the connector is completed。

once finished the work of support hive, will update code so soon as possible.

@bvaradar
Copy link
Contributor

@xiarixiaoyao : Thanks for the update. Were you able to finish the integration with Hive ? Can you update and rebase the PR.

Thanks,
Balaji.V

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar Hive's work has been completed and will start updating the code today. Thank you very much for your patience.

@xiarixiaoyao xiarixiaoyao force-pushed the id_schema branch 2 times, most recently from b6c2389 to 40e49cf Compare January 26, 2022 04:05
@xiarixiaoyao
Copy link
Contributor Author

xiarixiaoyao commented Jan 26, 2022

@bvaradar @leesf @codope @XuQianJin-Stars
already rebased and updated the codes

  1. add test for FileBasedInternalSchemaStorageManger and rebase code

  2. add support for change column type and fix some test case

    • now support:
    • int => long/float/double/string
    • long => float/double/string
    • float => double/String
    • double => String/Decimal
    • Decimal => Decimal/String
    • String => date/decimal
    • date => String
  3. fix some bugs encountered in the production env and delete useless code

look forward to your comments.
The adaptation of hive and spark will be brought up tomorrow

@xiarixiaoyao xiarixiaoyao force-pushed the id_schema branch 3 times, most recently from c051dfd to a39dba8 Compare January 30, 2022 01:45
@@ -48,6 +48,8 @@
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// compact
COMPACT("compact"),
// alter schema
ALTER_SCHEMA("alter_schema"),
// used for old version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if ALTER SCHEMA belong to the write operation, and if the ddl operation should be put in BaseHoodieWriteClient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review.
I think it should belong to write opertion. DDL operations of sparksql will treat it as a write operation.
of course, if you think this is inappropriate, i will remove it from WriteOperation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We store Alter Schema commands as a separate commits like other write operations. Hence, the need for a separate WriteOperation enum

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

@xiarixiaoyao xiarixiaoyao force-pushed the id_schema branch 3 times, most recently from 156c20b to b395803 Compare February 25, 2022 06:24
@xiarixiaoyao
Copy link
Contributor Author

@bvaradar we stored history schemas into .schema for the reason that we want track the lineage of schema changes
Later, we can use the incremental method to synchronize the schema to the metatable

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit comments. Overall looks good. Once, you are done with the changes, I will approve the diff. Looking into the Spark PR on top of this.

@xiarixiaoyao : We need one diff to document the usage and constraints (for eg: Hoodie columns should not be reordered, nested types ) of Alter commands as a section in Apache Hudi website.

/**
* set the version ID for this schema.
*/
public void setMax_column_id(int maxColumnId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make all the method names camelCase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry to use this nonstandard naming, already fixed

public class InternalSchemaMerger {
private final InternalSchema fileSchema;
private final InternalSchema querySchema;
// now there exist some bugs when we use spark update/merge api,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some pointers to the issue GH/Jira issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, already added

}

private void cleanResidualFiles() {
List<String> validateCommits = getValidInstants();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename validateCommits to validInstants

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (fs.exists(baseSchemaPath)) {
List<String> candidateSchemaFiles = Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
.map(file -> file.getPath().getName()).collect(Collectors.toList());
List<String> validateSchemaFiles = candidateSchemaFiles.stream().filter(f -> validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: validateSchemaFiles => validSchemaFiles

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@xiarixiaoyao
Copy link
Contributor Author

Nit comments. Overall looks good. Once, you are done with the changes, I will approve the diff. Looking into the Spark PR on top of this.

@xiarixiaoyao : We need one diff to document the usage and constraints (for eg: Hoodie columns should not be reordered, nested types ) of Alter commands as a section in Apache Hudi website.

@bvaradar Thank you very much for your comments
yes, hoodie internal columns should not be modified, We have already prohibited these situations in our code, and we can find the test code in TestSpark3DDL. this pr #4910

yes, i will prepare a doc for alter commands

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@vinothchandar
Copy link
Member

@bvaradar @xiarixiaoyao Please let me know if/when this is ready for a review. We plan to code freeze this friday

@vinothchandar
Copy link
Member

cc @xushiyan

@xiarixiaoyao
Copy link
Contributor Author

@vinothchandar yes, Thank you for your attention.
could you pls also help me review this pr in your spare time

In this PR, bvaradar also puts forward some modification suggestions. I'm actively address those comments

@vinothchandar
Copy link
Member

I have been syncing with @bvaradar actually. Going to take my pass over this PST tomorrow. Stay tuned!

@vinothchandar
Copy link
Member

vinothchandar commented Mar 30, 2022

@xiarixiaoyao In the meantime, please see if we can make it as safe as possible, in terms of having flags that will safeguard normal code paths.

@xiarixiaoyao
Copy link
Contributor Author

@vinothchandar thanks your very much.
#4910 already include those code, and bvaradar and i modfiy lots of codes , may be we can close this pr directly.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like mostly new code. I ll have to review more deeply. Flagged one concern on how we are saving the new instant

@xiarixiaoyao what testing has been done for this PR?

@@ -550,6 +551,9 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
}
}
writeToFile(wrapperSchema, records);
// try to clean old history schema.
FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(metaClient);
fss.cleanOldFiles(instants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am ok to move this to cleaner.

@@ -48,6 +48,8 @@
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// compact
COMPACT("compact"),
// alter schema
ALTER_SCHEMA("alter_schema"),
// used for old version
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

* @return InternalSchema for this table
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save it as an indexed file (HFile) so you can just read the last record or first record and be done? Having one source of truth would be good. We can also do this as follow up

@@ -591,6 +591,10 @@ private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inf
}
}

private Path getInstantFileNamePath(String fileName) {
return new Path(fileName.contains(SAVE_SCHEMA_ACTION) ? metaClient.getSchemaFolderName() : metaClient.getMetaPath(), fileName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we treat something as an instant, splitting this apart into folders seem a bit odd to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently our history schema is saved in/ hoodie/. In the schema directory, we need to save the schema through the metatable later, once done we don't need such strange logic anymore

@xiarixiaoyao
Copy link
Contributor Author

when DDL happened, a new commit will be create and saved, just like what we do when we first create table by using sparksql
UT can cover those function.
and IT with sparksql can be find in #4910

@xiarixiaoyao
Copy link
Contributor Author

as #4910 merged,
close this pr

PR Tracker Board automation moved this from Under Discussion PRs to Done Apr 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

Successfully merging this pull request may close these issues.

None yet

5 participants