-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT] Inconsistent reader and writer schema in HoodieAvroDataBlock cause exception #7284
Comments
@voonhous and me did some trouble shooting on this issue. And we found it is cased by the difference between writer schema and reader schema at this line: hudi/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java Line 171 in 76a28da
Writer schema: {
"type": "record",
"name": "test_mor_tab_record",
"namespace": "hoodie.test_mor_tab",
"fields": [
{
"name": "_hoodie_commit_time",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_commit_seqno",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_record_key",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_partition_path",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "_hoodie_file_name",
"type": [
"null",
"string"
],
"doc": "",
"default": null
},
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "price",
"type": "double"
},
{
"name": "ts",
"type": "long"
},
{
"name": "new_test_col",
"type": {
"type": "fixed",
"name": "fixed",
"namespace": "hoodie.test_mor_tab.test_mor_tab_record.new_test_col",
"size": 11,
"logicalType": "decimal",
"precision": 25,
"scale": 4
},
"doc": "a column for test decimal type"
},
{
"name": "dt",
"type": "string"
}
]
} Reader schema: {
"type": "record",
"name": "Record",
"fields": [
{
"name": "_hoodie_commit_time",
"type": [
"string",
"null"
]
},
{
"name": "_hoodie_commit_seqno",
"type": [
"string",
"null"
]
},
{
"name": "_hoodie_record_key",
"type": [
"string",
"null"
]
},
{
"name": "_hoodie_partition_path",
"type": [
"string",
"null"
]
},
{
"name": "_hoodie_file_name",
"type": [
"string",
"null"
]
},
{
"name": "id",
"type": [
"int",
"null"
]
},
{
"name": "name",
"type": [
"string",
"null"
]
},
{
"name": "price",
"type": [
"double",
"null"
]
},
{
"name": "ts",
"type": [
"long",
"null"
]
},
{
"name": "new_test_col",
"type": [
{
"type": "fixed",
"name": "fixed",
"namespace": "Record.new_test_col",
"size": 11,
"logicalType": "decimal",
"precision": 25,
"scale": 4
},
"null"
]
},
{
"name": "dt",
"type": [
"string",
"null"
]
}
]
} It can be saw in writer schema, the type of column {
"name": "new_test_col",
"type": {
"type": "fixed",
"name": "fixed",
"namespace": "hoodie.test_mor_tab.test_mor_tab_record.new_test_col",
"size": 11,
"logicalType": "decimal",
"precision": 25,
"scale": 4
},
"doc": "a column for test decimal type"
} But in reader schema, the type of column {
"name": "new_test_col",
"type": [
{
"type": "fixed",
"name": "fixed",
"namespace": "Record.new_test_col",
"size": 11,
"logicalType": "decimal",
"precision": 25,
"scale": 4
},
"null"
]
} According to Avro doc, However, the namespace in reader schema is different with writer schema, it causes the exception mentioned above If I replace the reader schema with the same namespace as writer schema, the test case can run properly. ...
private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content, InternalSchema internalSchema) throws IOException {
this.content = content;
this.dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(this.content)));
// 1. Read version for this data block
int version = this.dis.readInt();
HoodieAvroDataBlockVersion logBlockVersion = new HoodieAvroDataBlockVersion(version);
Schema finalReadSchema = readerSchema;
if (!internalSchema.isEmptySchema()) {
// we should use write schema to read log file,
// since when we have done some DDL operation, the readerSchema maybe different from writeSchema, avro reader will throw exception.
// eg: origin writeSchema is: "a String, b double" then we add a new column now the readerSchema will be: "a string, c int, b double". it's wrong to use readerSchema to read old log file.
// after we read those record by writeSchema, we rewrite those record with readerSchema in AbstractHoodieLogRecordReader
finalReadSchema = writerSchema;
}
Schema readSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Record\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":[\"string\",\"null\"]},{\"name\":\"_hoodie_commit_seqno\",\"type\":[\"string\",\"null\"]},{\"name\":\"_hoodie_record_key\",\"type\":[\"string\",\"null\"]},{\"name\":\"_hoodie_partition_path\",\"type\":[\"string\",\"null\"]},{\"name\":\"_hoodie_file_name\",\"type\":[\"string\",\"null\"]},{\"name\":\"id\",\"type\":[\"int\",\"null\"]},{\"name\":\"name\",\"type\":[\"string\",\"null\"]},{\"name\":\"price\",\"type\":[\"double\",\"null\"]},{\"name\":\"ts\",\"type\":[\"long\",\"null\"]},{\"name\":\"new_test_col\",\"type\":[{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.test_mor_tab.test_mor_tab_record.new_test_col\",\"size\":11,\"logicalType\":\"decimal\",\"precision\":25,\"scale\":4},\"null\"]},{\"name\":\"dt\",\"type\":[\"string\",\"null\"]}]}");
this.reader = new GenericDatumReader<>(writerSchema, readSchema);
if (logBlockVersion.hasRecordCount()) {
this.totalRecords = this.dis.readInt();
}
}
... |
After checking, we found this reader schema comes from Spark catalog table here: hudi/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala Line 222 in d976671
This issue can be temporarily fixed by setup schema as val userSchema = None But I think it is not a proper fix solution. It will break schema evolution as query schema could be different with So my question is how should we fix this issue in def convertToAvroSchema(structSchema: StructType): Schema = {
// FIXME: need use the same recordName as writer schema
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")
} |
The code for reproduce this issue: https://github.com/TengHuo/hudi/tree/HUDI-5271-verify Just run test case |
@TengHuo Thanks for sharing a reproducible test. It does look like a bug. I'll review the patch. |
@codope Got it, thanks Sagar. We are doing more test in Spark 3.1, seems this PR #7297 didn't fixed all issues. @YannByron said there is another fix for a similar issue in #6783. But @voonhous found another similar exception in our pipeline.
Will attach a test case later if I can figure how to trigger it in the unit test. |
The exception mentioned above can be triggered with this test case test("Test Insert Into MOR table 2") {
withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| dt string,
| name string,
| price double,
| ts long,
| test_decimal_col decimal(25, 4)
|) using hudi
|options
|(
| type = 'mor'
| ,primaryKey = 'id'
| ,hoodie.index.type = 'INMEMORY'
|)
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
| """.stripMargin)
// Note: Do not write the field alias, the partition field must be placed last.
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, 1.0, "2021-01-05"),
| (2, 'a2', 20, 2000, 2.0, "2021-01-06"),
| (3, 'a3', 30, 3000, 3.0, "2021-01-07")
""".stripMargin)
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, null, 1.0, "2021-01-05"),
| (4, 'a2', 20, 2000, null, "2021-01-06"),
| (5, 'a3', 30, 3000, 3.0, "2021-01-07")
""".stripMargin)
spark.sql(s"select id, name, price, ts, cast(test_decimal_col AS string), dt from $tableName").show(false)
}
} The reason of this exception is Line 209 in b6124ff
Then, the schema will be converted here: Line 440 in b6124ff
PS: seems @alexeykudinkin just fixed this issue yesterday in #6358. I'm checking it. |
Update The code of Avro schema convert was refactored by Alexey in this PR #6358, so I tested this issue based on the latest master d10d987. It is based on the new code of Alexey. This is the new branch for reproduce this issue: master...TengHuo:hudi:HUDI-5271-verify2 Run the test case
|
Thanks @codope. I just tested my test case based the latest master TengHuo@969e781 The test case can pass now. Let me close this issue as it is solved. |
Also fixed in PR #8026 |
Related JIRA ticket: https://issues.apache.org/jira/browse/HUDI-5271
Describe the problem you faced
When using Spark to create a hudi table with these config,
It will trigger an exception when query this table.
To Reproduce
Steps to reproduce the behavior:
hudi-spark-datasource
/hudi-spark
in the test classorg.apache.spark.sql.hudi.TestInsertTable
Expected behavior
This test case should run properly without any exception
Environment Description
Hudi version : latest master branch, commit
3109d890f13b1b29e5796a9f34ab28fa898ec23c
Spark version : tried Spark 2.4/3.1, all have the same issue
Hive version : N/A
Hadoop version : N/A
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : no
Additional context
Add any other context about the problem here.
Stacktrace
The full error stack of the above test case
The text was updated successfully, but these errors were encountered: