Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6872] Simplify Out Of Box Schema Evolution Functionality #9743

Merged
merged 62 commits into from
Nov 10, 2023

Conversation

jonvex
Copy link
Contributor

@jonvex jonvex commented Sep 18, 2023

Change Logs

Change how out of the box schema evolution works so it is easier to understand both by users and Hudi developers.

Things you can't do:

  • reorder columns
  • add new meta columns to nested structs

Added and Dropped Columns

  • New fields can be added to the end of the schema or to the end of nested structs. Those fields will be in the schema of any future write.
  • Fields in the latest table schema that are missing from the incoming schema will be added to the incoming data with null values.

Type Promotion

Promotions work on complex types such as arrays or maps as well
Promotions:

  • int is promotable to long, float, double, or string
  • long is promotable to float, double, or string
  • float is promotable to double or string
  • string is promotable to bytes
  • bytes is promotable to string

Rules:

  • If the incoming schema has a column that is promoted from the table schema's column type, the field will be the promoted type in the tables schema from now on
  • If the incoming schema has a column that is demoted from the table schema's column type, the incoming batch will have it's data promoted to the incoming schema

Impact

Change how out of the box schema evolution works so it is easier to understand both by users and Hudi developers.

Risk level (write none, low medium or high below)

High

lots of testing has been done, including performance testing on tpcds 1tb to ensure MOR avro log block reading has not been degraded

Documentation Update

jira

pr

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@jonvex jonvex changed the title [6872] Test out of box schema evolution for deltastreamer [HUDI-6872] Test out of box schema evolution for deltastreamer Sep 18, 2023
@voonhous
Copy link
Member

Hello @jonvex, a little curious as to why we are maintaining so 2 implementations of schema evolution.

  1. Out-of-box (supported by AVRO)
  2. Comprehensive (using hudi's InternalSchemaManager)

I believe it's a little confusing for users (especially new users)

@jonvex
Copy link
Contributor Author

jonvex commented Sep 19, 2023

@voonhous there is also hoodie.datasource.write.reconcile.schema as well. Very confusing.
It would be great to unify everything. It is difficult to make changes like this because different users may rely on all of these. With 1.0 I think changes like this may be allowed. Right now we are evaluating the current capabilities of schema evolution and hopefully we can make this feature better

@kazdy
Copy link
Contributor

kazdy commented Sep 28, 2023

@voonhous there is also hoodie.datasource.write.reconcile.schema as well. Very confusing.
It would be great to unify everything. It is difficult to make changes like this because different users may rely on all of these. With 1.0 I think changes like this may be allowed. Right now we are evaluating the current capabilities of schema evolution and hopefully we can make this feature better

Can we discuss this as a community in email thread? Let people share ideas and needs around schema evolution and enforcement?

@nsivabalan
Copy link
Contributor

@hudi-bot run azure

@nsivabalan nsivabalan added the priority:critical production down; pipelines stalled; Need help asap. label Oct 26, 2023
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on the patch man.

btw, can you do the following as well.

  1. Check for meta sync flows to see how schema change is deduced. We have optimization where in we trigger remote sync only incase of partitions add/remove and during schema changes. So, lets ensure we are good on that part.
  2. Also, did we test all the new behavior w/ diff meta syncs (like hive, presto, trino etc). I am sure we would have tested w/ spark. but did you get a chance to test our other query engines.
  3. Also, lets ensure we do a round of testing w/ spark-sql. Most common schema evolutions come from this writer.
  4. After evolving the schema, if we do savepoint and restore to a older commit, I assume the table will still be intact. i.e it might go back to older schema. and is readable.
  5. I am half way through reviewing the patch. But tell me something. if a commit which is trying to evolve the schema failed just before creating the completed commit metadata in the timeline, we are still intact right. there is complete isolation and a new writer is not impacted by it. i.e. it should not see the evolved schema at all at any point in time. For eg, incase its a MOR table, all log blocks rely on table scheme and not on previous log file schema. just calling out an example.
  6. Can you write a mini RFC may be on this patch. Even if it covers the current state of things, and flow of schemas and evolution for developers to understand the end to end flows, it would be great. Or you can publish it as a hudi blog as well. whatever works. For eg, where the schema reconciliation comes into play, what is reconcile, what happens in write handles, how partial file groups are updated when schema evols or how read happens w/ diff file groups in diff schemas etc.
  7. Can you call out how does time travel and incremental queries behave w/ schema evolution.
    for eg:
    C1: 5 col schema (V1)
    C2: 6 col schema (V2)
    C3: 6 col schema (V2) //no change in schema
    C4: 7 col schema (V3)

for a time travel query with commit time C2, what does the schema look like? Is it V2 or V3 (assuming c4 is complete) when the query is issued.

and for an incremental query
for a. until C1: is it V1?
b. begin C1, end C2: is it V2?
c. being C1, end C4: is it V3?

if (schema.getType() == Schema.Type.NULL) {
return schema;
}
return convert(convert(schema), schema.getFullName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious to know why do we need two convertions. can't we directly fix or create AvorSchema by fixing the null ordering

public static Schema reconcileNullability(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) {
if (sourceSchema.getFields().isEmpty() || targetSchema.getFields().isEmpty()) {
public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) {
if (sourceSchema.getType() == Schema.Type.NULL || sourceSchema.getFields().isEmpty() || targetSchema.getFields().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if source schema fields are empty, shouldn't we be returning targetSchema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonvex : do you have any pointers here

@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
val addNullForDeletedColumns = opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we definitely need good docs for this method. lets enhance docs in L545. even having illustrative examples is totally fine. We have been soft/low key on schema evolution in general. lets button up and ensure we get it right this time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema handling is grown up to be sizable now. Lets move it to a separate static class instead of adding everything to HoodieSparkSqlWrtier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am moving deduceWriterSchema() to HoodieSchemaUtils

|Table's schema ${latestTableSchema.toString(true)}
|""".stripMargin)
throw new SchemaCompatibilityException("Incoming batch schema is not compatible with the table's one")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can you throw light on why we don't call AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema)
in else block in L 658.

ie. when reconcile schema is set to false, and AVRO_SCHEMA_VALIDATE_ENABLE is set to true, looks like we don't ever call AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema) .

also, curious to know whats the diff b/w AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, latestTableSchema) and HoodieSparkSqlWriter.canonicalizeSchema()

bcoz, both takes in both source schema and table schema as args

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonvex ping

@nsivabalan
Copy link
Contributor

@hudi-bot run azure

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan
Copy link
Contributor

image

@nsivabalan nsivabalan merged commit d859c46 into apache:master Nov 10, 2023
30 checks passed
nsivabalan pushed a commit to nsivabalan/hudi that referenced this pull request Nov 23, 2023
…e#9743)

Change how out of the box schema evolution works so it is easier to understand both by users and Hudi developers.

Things you can't do:

- Reorder columns
- add new meta columns to nested structs

Support being added OOB:

- New fields can be added to the end of the schema or to the end of nested structs. Those fields will be in the schema of any future write.
- Fields in the latest table schema that are missing from the incoming schema will be added to the incoming data with null values.
- Type Promotion
- Promotions work on complex types such as arrays or maps as well

Promotions:

int is promotable to long, float, double, or string
long is promotable to float, double, or string
float is promotable to double or string
string is promotable to bytes
bytes is promotable to string

Rules:
- If the incoming schema has a column that is promoted from the table schema's column type, the field will be the promoted type in the tables schema from now on
- If the incoming schema has a column that is demoted from the table schema's column type, the incoming batch will have it's data promoted to the incoming schema
@voonhous
Copy link
Member

voonhous commented Dec 22, 2023

@jonvex @xiarixiaoyao

Apologies for necro-ing this PR. I was revisiting this PR today and noticed that using Hudi's comprehensive schema evolution using that is managed via Hudi's InternalSchema is still supported via write paths using the Dataframe API.

i.e. dataframe.write.format("hudi").options(...).mode(...).save(basePath)

Where the options require these 2 confguration:

hoodie.schema.on.read.enable=true
hoodie.datasource.write.reconcile.schema=true

This means that we can still make use of Hudi comprehensive schema evolution to perform schema evolution. This behaviour is consistent with what we used to have in the past, which is good!

However, this means of using schema evolution is not really documented and I am wondering if the community has any plans to ensure that the end-to-end flow for this use-case is error-free.

For now, there are 2 entrypoints for Spark in which Hudi comprehensive schema evolution can be done. One, via Spark-SQL, and the other via Dataframe API as i described above just now.

Considering the case where tables are sync-ed to a hive-catalogue, which is one of the more common use cases. Spark-SQL current does this via sparkSession.sessionState.catalog.externalCatalog.alterTableDataSchema. Hence, hive-sync is done by Spark's code.

However, when Dataframe API, especially Deltrastreamer, hive-sync is done using hudi-hive-sync. Something that Hudi manages internally.

Referring to the code below, if one does a Hudi comprehensive schema evolution outside of the scope defined here, hive-sync will fail, although UPSERT succeeds.

public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) {
if (prevType == null || prevType.trim().isEmpty() || newType == null || newType.trim().isEmpty()) {
return false;
}
prevType = prevType.toLowerCase();
newType = newType.toLowerCase();
if (prevType.equals(newType)) {
return true;
} else if (prevType.equalsIgnoreCase(INT_TYPE_NAME) && newType.equalsIgnoreCase(BIGINT_TYPE_NAME)) {
return true;
} else if (prevType.equalsIgnoreCase(FLOAT_TYPE_NAME) && newType.equalsIgnoreCase(DOUBLE_TYPE_NAME)) {
return true;
} else {
return prevType.contains("struct") && newType.toLowerCase().contains("struct");
}
}

Questions

  1. Is the community planning to support Hudi comprehensive schema evolution via Dataframe API?
  2. If so, a refactoring might be in store to move InternalSchema into the different hudi-sync implementations such that there is a translation of Hudi's InternalSchema type to XYZ-sync type.
  3. If we are not doing this, should we document this behaviour and explicitly let users know what our intended usage pattern for schema evolution is? i.e. users should stop all their write jobs, perform hudi comprehensive schema evolution via Spark-SQL, then resume their write jobs via deltastreamer/other non-Spark-SQL writes.

CC @TengHuo

@TengHuo
Copy link
Contributor

TengHuo commented Dec 26, 2023

Questions

  1. Is the community planning to support Hudi comprehensive schema evolution via Dataframe API?
  2. If so, a refactoring might be in store to move InternalSchema into the different hudi-sync implementations such that there is a translation of Hudi's InternalSchema type to XYZ-sync type.
  3. If we are not doing this, should we document this behaviour and explicitly let users know what our intended usage pattern for schema evolution is? i.e. users should stop all their write jobs, perform hudi comprehensive schema evolution via Spark-SQL, then resume their write jobs via deltastreamer/other non-Spark-SQL writes.

Sorry for necro-ing this MR again. Complement about this question,

  1. Is the community planning to support Hudi comprehensive schema evolution via Dataframe API?
  1. Is the community planning to support Hudi comprehensive schema evolution via SparkRDDWriteClient API and HiveSyncTool?**

This issue mainly impact the pipelines of HoodieDeltaStreamer, e.g. UPSERT a batch of data with schema evolution succeeded, but hive sync could fail, then causing schema inconsistent issues when user run a query.

And add a little bit more about the current HoodieSyncTool.

As we understand, HoodieSyncTool could be used as an independent tool to sync Hudi table information to some outside meta data management service, e.g. HMS, with the method HoodieSyncTool#syncHoodieTable. So, there is no input parameters for this method, it needs to infer the schema and partition changes by itself.

Thus, in the method HiveSyncTool#syncHoodieTable, it will load Hudi table schema information from HDFS and HMS, and do the schema comparison work in the method HiveSchemaUtil#getSchemaDifference. As Voon mentioned, this method has its own logic to decide the schema type update role, which is not totally compatible with the current Hudi Schema Evolution.

For solving this issue, we are thinking about utilising the information from InternalSchema in HoodieSyncTool, and let each meta sync tool to decide how to translate these TableChange to its own meta service.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap.
Projects
None yet
Development

Successfully merging this pull request may close these issues.