Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-1453] Throw Exception when input data schema is not equal to th… #2334

Closed
wants to merge 1 commit into from

Conversation

pengzhiwei2018
Copy link
Contributor

@pengzhiwei2018 pengzhiwei2018 commented Dec 14, 2020

…e hoodie table schema

add test case

Tips

What is the purpose of the pull request

Fix the bug when write compatible data type to the hudi table, an exception throw out. For example when first write a double to the table, then write a int to the table, an exception throw out as the ISSUE [HUDI-1453] describe.

Brief change log

Currently, Hudi will store the incoming record schema(we call it inputSchema here) in HoodieWriteConfig. In the write stage, Hoodie use the inputSchema to parse the incoming record and read&write record to the hoodie table.

In most case it works well.However when the incoming schema is compatible but not equal to the table schema(write a "int" to "long").It will throw an Exception in write stage as the ISSUE [HUDI-1453] describe. I fix this issue by distinguishing the two kind schemas(inputSchema and tableSchema). Here is the major change:

  1. Divide writerSchema into inputSchema and tableSchema in HoodieWriteHandle.The inputSchema is used for getInsertValue to parse the incoming record.And the tableSchema is used for read/write data in/to the table which is the schema of the hoodie table.
  2. Change all references to writerSchema to inputSchema or tableSchema, according to the usage scenario. If parser the incoming record, we use the inputSchema. If we read/write data from the table, we use tableSchema.
  3. Add a getTableSchema method in HoodieTable to read the table schema.
  4. Refactoring the construct method for HoodieWriteHandle. Add an Option param to the construct method for HoodieBootstrapHandle to pass the specified schema.

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added TestCOWDataSource#testWriteWithCompatibleType to verify the change

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Dec 15, 2020

Codecov Report

Merging #2334 (bbb604a) into master (e926c1a) will increase coverage by 0.00%.
The diff coverage is 0.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #2334   +/-   ##
=========================================
  Coverage     50.73%   50.73%           
- Complexity     3064     3065    +1     
=========================================
  Files           419      419           
  Lines         18797    18797           
  Branches       1922     1922           
=========================================
+ Hits           9536     9537    +1     
  Misses         8485     8485           
+ Partials        776      775    -1     
Flag Coverage Δ Complexity Δ
hudicli 37.26% <ø> (ø) 0.00 <ø> (ø)
hudiclient 100.00% <ø> (ø) 0.00 <ø> (ø)
hudicommon 52.01% <0.00%> (ø) 0.00 <0.00> (ø)
hudiflink 10.20% <ø> (ø) 0.00 <ø> (ø)
hudihadoopmr 33.06% <ø> (ø) 0.00 <ø> (ø)
hudisparkdatasource 65.90% <ø> (ø) 0.00 <ø> (ø)
hudisync 48.61% <ø> (ø) 0.00 <ø> (ø)
huditimelineservice 66.84% <ø> (ø) 0.00 <ø> (ø)
hudiutilities 69.48% <ø> (+0.05%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
.../apache/hudi/common/table/TableSchemaResolver.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...apache/hudi/utilities/deltastreamer/DeltaSync.java 70.86% <0.00%> (+0.35%) 51.00% <0.00%> (+1.00%)

@vinothchandar vinothchandar self-assigned this Dec 16, 2020
@nsivabalan
Copy link
Contributor

@pengzhiwei2018 : sorry I am not following the problem here. Can you throw some light.
Schema evolution is throwing an exception w/ Hudi, i.e a field's data type evolved to a compatible data type and hudi fails to accommodate it? is that the problem.
If yes, can you check #2350. there is a bug in schema compatibility check and the patch should fix it. If not, can you elaborate what exactly is the problem.
For eg: use-case given at #2063 succeeds with #2350

@pengzhiwei2018
Copy link
Contributor Author

@pengzhiwei2018 : sorry I am not following the problem here. Can you throw some light.
Schema evolution is throwing an exception w/ Hudi, i.e a field's data type evolved to a compatible data type and hudi fails to accommodate it? is that the problem.
If yes, can you check #2350. there is a bug in schema compatibility check and the patch should fix it. If not, can you elaborate what exactly is the problem.
For eg: use-case given at #2063 succeeds with #2350

Hi @nsivabalan, When I write a long value to the double type field in the table, It can pass the TableSchemaResolver.isSchemaCompatible,However it failed in the write stage.
I found the problem is that the schema (e.g double) used in write & read table are the same with the input schema(e.g. long). I fixed this problem by distinguishing the two schemas.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments. Looks good in general. @bvaradar @n3nash : just incase you are interested, you can check this PR out. Fixes handling of schema within Hoodie.

@@ -38,8 +38,7 @@
public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, commitTime, hoodieTable, partitionPath, fileId,
Pair.of(HoodieAvroUtils.RECORD_KEY_SCHEMA,
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.RECORD_KEY_SCHEMA)), taskContextSupplier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar : can you confirm that this change looks good.

@@ -152,9 +155,9 @@ public void write() {
final String key = keyIterator.next();
HoodieRecord<T> record = recordMap.get(key);
if (useWriterSchema) {
write(record, record.getData().getInsertValue(writerSchemaWithMetafields));
write(record, record.getData().getInsertValue(inputSchemaWithMetaFields));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. not exactly related to this patch. But can we rename "useWriterSchema" to something like "isCompactor" so that its explicit and comprehensible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion!isCompactor well be better understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually disagree completely :) . Leaking such call hierarchy into a lower level class will lead to more confusions, if say one more code path uses this code.

*/
protected final Schema inputSchema;
/**
* The input shema with meta fields.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. typo. "schema"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your correct.

/**
* The table schema is the schema of the table which used to read/write record from table.
*/
protected final Schema tableSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this change. Definitely looks more readable now and avoids any confusion

writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
tableSchema = getTableSchema(config, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the 2nd arg supposed to be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nsivabalan ,here createHoodieWriteSchema will add the meta fields to the schema, so the tableSchema should also have the meta fields here.

this.writerSchemaWithMetafields = writerSchemaIncludingAndExcludingMetadataPair.getValue();
// Here the hoodieTable#config may not equal to the config field, we use the 'config' from the
// construct method
this.tableSchema = schemaOption.orElseGet(() -> hoodieTable.getTableSchema(config, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to confirm my understanding. Only for bootstrap code path, we need the table schema to be initialized w/ incoming Schema and can't rely on hoodieTable.getTableSchema(). If not, we should not be looking at schemaOption only to set tableSchema(in every other call paths).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @nsivabalan ,the schemaOption is only used for HoodieBootstrapHandle to pass it's schema.

public Schema getWriterSchema() {
return writerSchema;
return tableSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel we could rename this method to getReaderSchema() since this is used only at one place

bootstrapReadSchema = mergeHandle.getWriterSchema();

And we assign it to a var to store read schema anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable!

@n3nash
Copy link
Contributor

n3nash commented Jan 4, 2021

@pengzhiwei2018 @nsivabalan Is there a different way to resolve this issue ? The writer/reader schema is baked into many parts of the code, even on the reader side, see example here -> https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java#L70

To be able to remove the concept of writer into input requires more changes in the code and also requires us to understand what does the readerSchema mean in this scenario.

I'm OK if you want to introduce another transient field called tableSchema and compare the writer (in your case inputSchema) against the tableSchema to throw the exception, but keep the concepts of writer/reader intact. If you want to propose changing them, I think putting a 1 pager about how writer/reader schema maps to your new suggested way (input/table) will help to understand how to reason about schemas going forward to avoid any regressions.

@pengzhiwei2018
Copy link
Contributor Author

pengzhiwei2018 commented Jan 5, 2021

Hi @n3nash ,Thanks for your suggestion.
This PR only covers the part where HUDi writes the data, not the part where hudi reads the data. The introduced inputSchema/tableSchema is only used for parser the incoming record and read&write data in table.which is much different from the writer/reader schema in RealtimeCompactedRecordReader.

And I have update the describe the changes in more detail in the Brief change log part for better to understand. You take a look again when you have time.

…e hoodie table schema

add test case

fix test case

fix test case

fix test case

fix test

fix test case

fix format

fix format

fix format

fix test case & add comment

add some comment

fix change schema bug

revert merge schema

refactor code
@vinothchandar vinothchandar added priority:critical production down; pipelines stalled; Need help asap. and removed priority:blocker labels Feb 6, 2021
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pengzhiwei2018 first of all, thanks for these great contributions.

Wrt inputSchema vs writeSchema, I actually feel writeSchema already stands for inputSchema, input is what is being written, right? We can probably just leave it as is. and introduce new tableSchema variables as you have in the HoodieWriteHandle class.?

Like someone else pointed out as well, so far, we are using read and write schemas consistently. Love to not introduce a new input schema, unless its absolutely necessary .

@pengzhiwei2018
Copy link
Contributor Author

pengzhiwei2018 commented Mar 15, 2021

@pengzhiwei2018 first of all, thanks for these great contributions.

Wrt inputSchema vs writeSchema, I actually feel writeSchema already stands for inputSchema, input is what is being written, right? We can probably just leave it as is. and introduce new tableSchema variables as you have in the HoodieWriteHandle class.?

Like someone else pointed out as well, so far, we are using read and write schemas consistently. Love to not introduce a new input schema, unless its absolutely necessary .

Hi @vinothchandar ,thanks for your reply on this issue.
Yes, in most case ,the writeSchema is the same with the inputSchema which can stands for the inputSchema . But in the case in this PR (test case in TestCOWDataSource) we write the table twice:
First, we write a "id: long" to the table. The input schema is "a:long", the table schema is "a:long".
Second, we write a "id:int" to the table. The input schema is "a:int", but the table schema is "a:long" as the previous write. The write schema should be the same with the table schema, or else an Exception would throw out which is the problem we want to solve in this PR.
So in this case, we need to distinguish the difference between the inputSchema and writeSchema. The inputSchema is the incoming records's schema, but the writeSchema is always the tableSchema.

  • The inputSchema is used to parser the record from the incoming data.
  • The tableSchema is used to write and read record from the table. When we want to write or read record to the table, we use the tableSchema.

@prashantwason
Copy link
Member

So to rephrase the description this solves the case where the input-data has a compatible field (int) to be written to the table (long field). Can this issue not be solved at the input record level by converting the "int" data into the "long" before writing into HUDI?

hoodieTable.getTableSchema() always returns the "latest" schema which is the schema used for the last HoodieWriteClient (saved into commit instants). So when the "int" based RDD is written the table schema will no long have a "long" field. When this table schema is used to read an older file in the table (merge during updatehandle) then the reading should fail as a long (from parquet) cannot be converted to an int (from schema). This is actually a backward incompatible schema change and hence is not allowed by HUDI.

@pengzhiwei2018 Can you add a test to verify my hypothesis? In your existing test in TestCOWDataSource, can you write a long to the table in the next write? Also, can you read all the data using the "int" schema, even the older records which contain a long?

@nsivabalan
Copy link
Contributor

just to clarify. some schema evolution just for types are working fine w/ hoodie. for eg: integer to double works fine. Problem is that, double to int is where the issue is. I am not sure why schema compatibility check does not fail this evolution.

@nsivabalan
Copy link
Contributor

nsivabalan commented Apr 1, 2021

oh, didn't realize default schema compatibility check is false. I assume if I enable schema compatibility check, double to int evolution is likely to fail. So, not sure what this PR tries to achieve. At least from my local test run, integer to double worked for me.
https://gist.github.com/nsivabalan/91f12109e0fe1ca9749ff5290c946778
this is COW btw. haven't tested w/ MOR.
@pengzhiwei2018 : can you please clarify.

@pengzhiwei2018
Copy link
Contributor Author

https://gist.github.com/nsivabalan/91f12109e0fe1ca9749ff5290c946778

Hi @nsivabalan , I have take a review for your test code. First you write a "int" to the table, so the table schema type is "int". Then you write a "double" to the table, so the table schema become to "double". The table schema changed from "int" to "double". I think this is more reasonable.

In my original idea, I think the first write schema(e.g. "int") is the table schema forever. The incoming records after that should be compatible with the origin table schema(e.g. "int"). This is this PR wants to solve. I can understand more clearly now. The table schema should be change to a more generic type (e.g. from "int" to "double"), but not always be the first write schema.
So I can close this PR now. Thanks @nsivabalan for correct me.

@nsivabalan
Copy link
Contributor

Yeah, I did verify by enabling schema compatability check. it will fail if we try to evolve a field from double to int.

scala> dfFromData5.write.format("hudi").
     |   options(getQuickstartWriteConfigs).
     |   option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
     |   option(RECORDKEY_FIELD_OPT_KEY, "rowId").
     |   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
     |   option("hoodie.index.type","SIMPLE").
     |   option(TABLE_NAME, tableName).
     |   option("hoodie.avro.schema.validate","true").
     |     mode(Append).
     |   save(basePath)
org.apache.hudi.exception.HoodieUpsertException: Failed upsert schema compatibility check.
  at org.apache.hudi.table.HoodieTable.validateUpsertSchema(HoodieTable.java:629)
  at org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:152)
  at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214)
  at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:186)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 72 elided
Caused by: org.apache.hudi.exception.HoodieException: Failed schema compatibility check for writerSchema :{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"rowId","type":["string","null"]},{"name":"partitionId","type":["string","null"]},{"name":"preComb","type":["long","null"]},{"name":"name","type":["string","null"]},{"name":"versionId","type":["string","null"]},{"name":"doubleToInt","type":["int","null"]}]}, table schema :{"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"rowId","type":["string","null"]},{"name":"partitionId","type":["string","null"]},{"name":"preComb","type":["long","null"]},{"name":"name","type":["string","null"]},{"name":"versionId","type":["string","null"]},{"name":"doubleToInt","type":["double","null"]}]}, base path :file:/tmp/hudi_trips_cow
  at org.apache.hudi.table.HoodieTable.validateSchema(HoodieTable.java:621)
  at org.apache.hudi.table.HoodieTable.validateUpsertSchema(HoodieTable.java:627)
  ... 97 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants