New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-2560][RFC-33] Support full Schema evolution for Spark #4910
Conversation
@bvaradar @codope @vinothchandar @xushiyan |
cb8c6f4
to
fa9cee1
Compare
@YannByron if you get a chance, please take a pass also. thanks |
@hudi-bot run azure |
@hudi-bot run azure |
1 similar comment
@hudi-bot run azure |
@bvaradar sorry to bother you,if you have free time,could you pls help me review this pr, thanks. |
this.fields = new Field[fields.size()]; | ||
for (int i = 0; i < this.fields.length; i += 1) { | ||
this.fields[i] = fields.get(i); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we initialize nameToFields
and idToFields
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your advice. But I think these two methods are more appropriate in the internal schema, and it feels strange in the types
public InternalSchema(long versionId, List<Field> cols) { | ||
this.versionId = versionId; | ||
this.record = RecordType.get(cols); | ||
if (versionId >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think here don't need to judge if versionId >= 0. Even though this a dummy schema, we also can initialize idToField
, nameToId
, idToName
and maxColumnId
according to cols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also, if cols is empty, all member variables is empty or initial values like maxColumnId
and versionId
are -1
. so i suggest to finish filling up members here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
nameToId = idToName.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); | ||
return nameToId; | ||
} | ||
nameToId = InternalSchemaUtils.buildNameToId(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both nameToId
and idToName
must be empty or not empty at the same time. Here, maybe not. So init both inside of the construct method above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* set the version ID for this schema. | ||
*/ | ||
public InternalSchema setSchemaId(long versionId) { | ||
this.versionId = versionId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will the setSchemaId
and setMax_column_id
be called alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1)when we add a new column, the max_column_id will be +1
2) when we do DDL, before we save internalSchema, we will setSchemaId for it.
* Internal schema for hudi table. | ||
* used to support schema evolution. | ||
*/ | ||
public class InternalSchema implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest that separate the get
methods like getXXX
and findXXX
and set
methods strictly. The getXXX
methods should not change or initialize the private member variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's ok to me. i just want to realize lazy find/get, when the internalSchema is very large, class initialization will be slow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not all fields in InternalSchema will be used,so use lazy initialization mode
class ColumnPositionChange { | ||
public enum ColumnPositionType { | ||
FIRST, | ||
BEFORE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe FIRST
and AFTER
is enough. BEFORE
is not necessary. we can keep this syntax consistent with Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is not only for spark. Users can also modfiy column changes using the API interface
*/ | ||
private Option<InternalSchema> getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) { | ||
try { | ||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just using metaClient.getActiveTimeline() may be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -177,6 +192,70 @@ class DefaultSource extends RelationProvider | |||
|
|||
override def shortName(): String = "hudi_v1" | |||
|
|||
private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these codes have been discarded. please adapt the new BaseFileOnlyViewRelation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a issue I want to discuss with you, BaseFileOnlyViewRelation cannot support vector read/ DPP, We ran over 1t of tpcds, and the performance decreased significantly
I will use another PR to support BaseFileOnlyViewRelation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xiarixiaoyao : Are we regressing to poor performance with this change ? @YannByron mentions that this code has been discarded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bvaradar yes i will try to paste a simple perf result, tomorrow.
Adapting this change is very easy, but I don't want to lose too much performance
@xiarixiaoyao 'Cause this pr is so huge. So please help me to sort the implement out.
|
spark 3.3 column support default value,
apache/spark#35690 (comment)
Yann Byron ***@***.***> 于2022年3月8日周二 11:29写道:
… @xiarixiaoyao <https://github.com/xiarixiaoyao> 'Cause this pr is so
huge. So please help me to sort the implement out.
1. do the SerDeHelper.LATESTSCHEMA attribute of one commit file and
the SAVE_SCHEMA_ACTION file save the same thing, or can they convert
each other?
2. if enable hoodie.schema.evolution.enable, will every commit persist
SerDeHelper.LATESTSCHEMA in meta file ?
3. When will commit the SAVE_SCHEMA_ACTION file? Once that schema is
changed ?
4. How to make the Hudi Table with old version like 0.10 compatible
with this ? If enable hoodie.schema.evolution.enable on an existed
old-version hudi table, what will happen? Or we are not about to make them
compatible, then how to refuse this.
5. this pr can work when enable hoodie.metadata.enable ?
6. why we need to separate Spark3.1 and Spark3.2?
—
Reply to this email directly, view it on GitHub
<#4910 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAIXXZTARW3OWW5B5B6XX2TU63CQ5ANCNFSM5PKF6GRQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
@bvaradar Thanks for reminding, already addressed all comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FInished taking a full pass @xiarixiaoyao : One question in HoodieSparkSqlWriter. Otherwise, looks safe to land.
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
Outdated
Show resolved
Hide resolved
schema = getLatestTableSchema(fs, basePath, sparkContext, schema) | ||
schema = lastestSchema | ||
} | ||
schema = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xiarixiaoyao : Do we need to do this for all cases ? Is it safe to do this only cases where internalSchema is not empty ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it workd for all case in hudi 0.9
for safe, let me add some logcial to fallback original logical
fixed. as follow code.
if (internalSchemaOpt.isDefined) {
schema = {
val newSparkSchema = AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema))
AvroConversionUtils.convertStructTypeToAvroSchema(newSparkSchema, structName, nameSpace)
}
}
@bvaradar have address all comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM @xiarixiaoyao . This is awesome work. Thanks a lot for contributing this feature and waiting for the review patiently.
To have users understand how to utilize schema on read feature, can you add a section in Spark Guide in the next PR which we can land independently.
We still have few more PRs to go for this feature after 0.11 - Hive, Concurrency, Trino, Flink... Looking forward to reviewing them.
CI report:
Bot commands@hudi-bot supports the following commands:
|
Still waiting for 1 job to finish for landing |
|
* @param colName col name to be changed. if we want to change col from a nested filed, the fullName should be specify | ||
* @param doc . | ||
*/ | ||
public void updateColumnComment(String colName, String doc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are all these functions getting used? I do not see any caller for these @xiarixiaoyao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the exposed API interfaces.
i recommend to do DDL operation by SparkSQL #5238
} | ||
// try to find all added columns | ||
if (diffFromOldSchema.size() != 0) { | ||
throw new UnsupportedOperationException("Cannot evolve schema implicitly, find delete/rename operation"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just trying to understand, is this being done for old Hudi tables? What is meant by evolve schema implicitly? I guess the error message is not very clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extraMeta.put(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(newSchema.setSchemaId(Long.getLong(instantTime)))); | ||
// try to save history schemas | ||
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); | ||
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of storing history schema here, I guess this is redundant since we are anyways storing the evolved schema as history schema in saveInternalSchema() method which gets called from commitStats(). WDYT @xiarixiaoyao ?
Also can you share your slack id with me, it will be easier to coordinate with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course, already ping you in slack
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr); | ||
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr)); | ||
} | ||
// update SCHEMA_KEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what concern we start a separate timeline for schemas, is there possibility we reuse the existing meta files for the internal schema ? And do we have plan to replace the avro schema with internal schema in the future ? The Avro schema can not handle data types like small int
and tiny int
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
answer
1)I think DDL should be an independent operation and should not intersect with the original commit
2)yes,we plan to do that, but before we start that we need flink to support full schema evolution, Otherwise, the gap of the Flink module and other modules will become larger and larger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what is the relationship with the DDL schema change and the schema change on write ? For schema change on write, we already reuse the schema in the instant metadata file, we should elaborate more to have uniform abstraction for these two cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi danney Do you want to ask line 292 why we use a new timeline to save history schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this patch, we have avro schema for metadata file and an separate internal schema for DDL operations, and the avro schema can also handle the shema change on write, these abstraction is not that clear and we need to elaborate more with the behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we probably need a blog or proper documentation describing the changes on a high level. WDYT @xiarixiaoyao ?
@@ -70,8 +73,19 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { | |||
HoodieCompactionPlan compactionPlan = | |||
CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime); | |||
|
|||
// try to load internalSchema to support schema Evolution | |||
HoodieWriteConfig configCopy = config; | |||
Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose we re-assign the config to configCopy
and modify it directly ? I mean, you should either modify the config
directly or copy the whole config to configCopy
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
origin config will not be modfy, when schema evolution happen,we will copy whole config to configCopy and modfiy the copy config; otherwize nothing will happen
maybe i miss something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configCopy
and config
reference the same Java object ;)
* @return whether to allow the column type to be updated. | ||
*/ | ||
public static boolean isTypeUpdateAllow(Type src, Type dsr) { | ||
if (src.isNestedType() || dsr.isNestedType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isTypeUpdateAllow => isTypeUpdateAllowed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets open another pr to track and modfiy those new comments
@xiarixiaoyao As ref-33 said, |
// TODO support bootstrap | ||
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { | ||
// check implicitly add columns, and position reorder(spark sql may change cols order) | ||
InternalSchema querySchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema, querySchemaOpt.get(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line merges the internalSchema with the incoming schema and gives us another internalSchema (querySchema).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- querySchema is the latest table schema which parsed from commit file. eg: a int, b string, c int
- readSchema is write schema(from dataFrame) by defalut, eg: a int, b string, c int, d string, e string
- we do evolution to get the final schema, eg: a int, b string, c int, d string, e string
&& writeInternalSchema.findIdByName(f) != -1 | ||
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList()); | ||
readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName()); | ||
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the need of the above 2 convert calls @xiarixiaoyao ? I guess it would be better to add some documentation or comments around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line112: we need to convert interalSchema to avroSchema and pass it to readSchema, and HoodieAvroUtils.rewriteRecordWithNewSchema will use reaSchema to get correct GenericRecord from parquet file.
eg: old parquet schema is: a int, b double , and genericRecord1 is data read from old parquet
but now incoming schema is: a long, c int, b string, and genericRecord2 is incoming data
we cannot merge genericRecord1 and genericRecord2, so we need rewrite genericRecord1 with new schema: a long, c int, b string
line113 is only used to check SchemaCompatibilityType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So let me reframe my question. On line 99, we only take care of addition of new columns in incoming schema for combining latest schema from commit file (S1) and incoming schema (S2). After combining them, we populate the combined schema to the variable querySchema (S3). As I understand, writeInternalSchema (S4) variable contains the same schema as S1.
Now on line 112, we merge S3 and S4 to take care of column type change and column renames. We finally convert this from InternalSchema to avroSchema using convert
call.
Please correct me if I am wrong in the above explanation. Now I have below questions -
- If S4 is same as S1, why do we even need this variable
writeInternalSchema
? We can simply use S1 throughout the if block. - Are we not supporting deletion of columns yet? Can you point me to the lines of code or the method where we are taking care of deletion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question!
question1:
S4 is not same as S1. S4 is the really schema from parquet file, if we do lots of DDL opertion on current table, S4 and s1 may differ greatly.
eg:
tableA: a int, b string, c double and there exist three files in this table: f1, f2, f3
- drop column from tableA and add new column d, and then we update tableA, but we only update f2,and f3, f1 is not touched
now schema
tableA: a int, b string, d long.
S1: a int, b string, d long
S4 from f1 is: a int, b string , c double
question2:
no we supporting delete of columns, Let's use the above example to illustrate:
line 112, we merge S3 and S4 to get the final read Schema,
tableA: a int, b string, d long.
S3: a int, b string, d long
S4 from f1 is: a int, b string , c double
merge S3 and S4: a int, b string, d long column c is dropped,
the values read from parquet f1 will be
a b d
1 'test' null
d is null, since f1 is not contains column d. column c is dropped, since current table is not contains column c.
readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName()); | ||
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName()); | ||
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size() | ||
|| SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want to rewrite the record if writer and reader schemas are compatible? What do we miss if we do not do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is a bug. will fixed it today, thanks
sorry,miss the message. wait for HUDI-5148. |
primitiveSchema = LogicalTypes.decimal(decimal.precision(), decimal.scale()) | ||
.addToSchema(Schema.createFixed( | ||
"decimal_" + decimal.precision() + "_" + decimal.scale(), | ||
null, null, computeMinBytesForPrecision(decimal.precision()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing trouble shooting on one issue #7284 recently, which is an inconsistent Avro schema namespace exception Caused by: org.apache.avro.AvroTypeException: Found decimal_25_4, expecting union
during reading log files. After checking, we found this name decimal_25_4
generated here.
May I ask the reason why the name is setup as this pattern "decimal_" + decimal.precision() + "_" + decimal.scale()
here? Can we keep the original name if we add more arguments in method visitInternalPrimitiveToBuildAvroPrimitiveType
?
E.g.
private static Schema visitInternalPrimitiveToBuildAvroPrimitiveType(Type.PrimitiveType primitive, String name, String space)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, this issue has been fixed in PR #6358
Tips
What is the purpose of the pull request
Support full schema evolution for hoodie:
alter statement:
ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
support follow types
int => long/float/double/string
long => float/double/string
float => double/String
double => String/Decimal
Decimal => Decimal/String
String => date/decimal
date => String
ALTER TABLE table1 ALTER COLUMN a.b.c SET NOT NULL
ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
add statement:
ALTER TABLE table1 ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
rename:
ALTER TABLE table1 RENAME COLUMN a.b.c TO x
drop:
ALTER TABLE table1 DROP COLUMN a.b.c
ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
set/unset Properties:
ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value');
ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.