-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32864][SQL] Support ORC forced positional evolution #29737
[SPARK-32864][SQL] Support ORC forced positional evolution #29737
Conversation
Test build #128593 has finished for PR 29737 at commit
|
cc @dongjoon-hyun FYI |
Thanks @HyukjinKwon. @dongjoon-hyun could you please take a look at this PR? |
withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { // default since 2.3.0 | ||
withTable("t") { | ||
sql(s"CREATE EXTERNAL TABLE t(c3 INT, c4 INT) STORED AS ORC LOCATION '$path'") | ||
checkAnswer(spark.table("t"), Row(1, 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there also be a test with orc.force.positional.evolution=false to verify that the answer is Row(null, null)? Or does that test already incidentally exist elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've extended the test with that case.
if (orcFieldNames.isEmpty) { | ||
// SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. | ||
None | ||
} else { | ||
if (orcFieldNames.forall(_.startsWith("_col"))) { | ||
if (forcePositionalEvolution || orcFieldNames.forall(_.startsWith("_col"))) { | ||
// This is a ORC file written by Hive, no field names in the physical schema, assume the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a ORC file written by Hive, no field names in the physical schema
The comment should probably reflect your change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #128765 has finished for PR 29737 at commit
|
// This is a ORC file written by Hive, no field names in the physical schema, assume the | ||
// physical schema maps to the data scheme by index. | ||
if (forcePositionalEvolution || orcFieldNames.forall(_.startsWith("_col"))) { | ||
// This is either an ORC file written by an old versions of Hive and there are no field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super tiny nit: "written by an old versions of Hive" should be
written by an old version of Hive
or
written by old versions of Hive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if (forcePositionalEvolution || orcFieldNames.forall(_.startsWith("_col"))) { | ||
// This is either an ORC file written by an old versions of Hive and there are no field | ||
// names in the physical schema, or a file written by a newer versions of Hive where | ||
// `orc.force.positional.evolution=true` and columns were renamed so the physical schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super tiny nit (feel free to ignore if it doesn't make sense to you). I suggest:
orc.force.positional.evolution=true
(possibly because columns were renamed so the physical schema doesn't match the data schema).
Your current version of the comment implies orc.force.positional.evolution=true will happen when the columns are renamed. But you can set orc.force.positional.evolution=true anytime, and you can rename the columns and neglect to set orc.force.positional.evolution, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, thanks. Fixed the comment.
Test build #128833 has finished for PR 29737 at commit
|
retest this please |
@@ -142,13 +142,17 @@ object OrcUtils extends Logging { | |||
reader: Reader, | |||
conf: Configuration): Option[(Array[Int], Boolean)] = { | |||
val orcFieldNames = reader.getSchema.getFieldNames.asScala | |||
val forcePositionalEvolution = OrcConf.FORCE_POSITIONAL_EVOLUTION.getBoolean(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look like a reasonable config for a data source, but more like a config for the upper level that maps metastore schema with the ORC files.
Do we have a good story for the schema evolution of spark file sources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure I get your point.
In requestedColumnIds()
we map requiredSchema
to the schema in the ORC files (orcFieldNames
) and actually Spark is prepared for that the schema in HMS and in the file doesn't always match: https://github.com/apache/spark/pull/29737/files#diff-3fb8426b690ab771c4f67f9cad336498L149 (the file is written by an old version of Hive).
It turned out that a schema mismatch can happen with newer version of Hive (columns in the file doesn't start with _col
) too. Because simply setting orc.force.positional.evolution=true
and then doing a column rename in Hive also results mismatch in Spark and in that case Spark returns null
s now.
It seemed a good idea to add support for this setting to our data source but if that is not the good way to deal with the issue please let me know.
(I've updated the PR description a bit to make it more clear what I'm trying to fix.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, this doesn't look like an ORC-specific thing. Does hive only do this for ORC tables but not Parquet tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point now, but this is an ORC specific setting in Hive. With parquet tables you get NULL
on a renamed column:
> set orc.force.positional.evolution;
+--------------------------------------+
| set |
+--------------------------------------+
| orc.force.positional.evolution=true |
+--------------------------------------+
> create external table t2 (c1 string, c2 string) stored as parquet;
> insert into t2 values ('foo', 'bar');
> alter table t2 change c1 c3 string;
> select * from t2;
+--------+--------+
| t2.c3 | t2.c2 |
+--------+--------+
| NULL | bar |
+--------+--------+
Test build #128871 has finished for PR 29737 at commit
|
Seq("native", "hive").foreach { orcImpl => | ||
Seq(true, false).foreach { forcePositionalEvolution => | ||
withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImpl, | ||
OrcConf.FORCE_POSITIONAL_EVOLUTION.getAttribute -> forcePositionalEvolution.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to use this config as it is? Or append usual prefix spark.hadoop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall we add a prefix for third-party library configs like mentioned above. Is this an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I think you might be right, I've prefixed the config in fbab1e4. Let me know it is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this effort, @peter-toth and all.
However, I'm reluctant to recommend this option for Apache Spark 3.1 on this December 2020. In short, I recommend to hold on this approach in Apache Spark community until that because orc.force.positional.evolution
is known to have some serious correctness bugs still. Currently, this option is known to have bugs in complex schema especially. For example, please see the following ORC issues.
- ORC-644 nested struct evolution does not respect to orc.force.positional.evolution (Fixed in 1.5.11, 1.6.4, 1.7.0)
- ORC-667 Positional mapping for nested struct types should not applied by default (Open for 1.5.11, 1.6.4, 1.7.0)
In short, there is no healthy available Apache ORC versions for this option. Especially, the latest Apache ORC 1.5.11 is not able to pass Apache Spark sql
module unit tests even by default.
So, shall we discuss this later after we have an official working Apache ORC version which passes Apache Spark UTs at least?
cc @gatorsmile and @dbtsai |
@dongjoon-hyun Thanks for the references! It seems that the |
All right, thanks @dongjoon-hyun for the details. I guess I should close this PR for now. |
Thank you so much, @peter-toth and @cloud-fan . |
@dongjoon-hyun, @cloud-fan those 2 issues has been solved in ORC 1.5.12. Do you think we can resume this PR? @cloud-fan, I understand that this PR doesn't give a full-blown schema evolution support natively in Spark, but it would definitely help a few users who has been using this flag in Hive and would like to use the same in Spark too. |
which ORC version spark is using? |
We started to use 1.5.12 recently: aa66579 |
This looks like a useful addition, is there any update on this? |
@redsanket . |
Kubernetes integration test starting |
@@ -160,6 +160,9 @@ class OrcFileFormat | |||
val capacity = sqlConf.orcVectorizedReaderBatchSize | |||
|
|||
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) | |||
OrcConf.FORCE_POSITIONAL_EVOLUTION.setBoolean(hadoopConf, | |||
hadoopConf.getBoolean("spark.hadoop." + OrcConf.FORCE_POSITIONAL_EVOLUTION.getAttribute, | |||
false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is added at the last commit. However, we already remove spark.hadoop.
prefix from Spark side already, don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see line 162. hadoopConf
is a cleaned one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Yeah, you're right. The hadoop conf here is cleaned up. If users specify spark.hadoop.orc.force.positional.evolution
, it will be removed the prefix and added into this conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I might get something wrong but I don't think that hadoopConf
is cleaned here as if I remove this line the UTs fail. As far as I see the only reference to SparkHadoopUtil.appendSparkHadoopConfigs
is fromspark-hive
HiveUtils.newTemporaryConfiguration.
I can revert the prefix in https://github.com/apache/spark/pull/29737/files#diff-e14fd8725cf71eee7b34fa299c2f3abe5a0033f9abce9de4c7e081ba57991b0bR146 though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth . You should not write a UT like withSQLConf("spark.hadoop.xxx")
. withSQLConf
is a testing framework only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet is the same. Please see ParquetFileFormat.scala
. In this context, the Hadoop Conf should have the original conf key, not spark.hadoop
prefixed ones.
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun, I see it now where the cleaning happens and reverted the last commit.
...main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #134021 has finished for PR 29737 at commit
|
This reverts commit fbab1e4.
Kubernetes integration test starting |
Kubernetes integration test status failure |
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala
Outdated
Show resolved
Hide resolved
Test build #134059 has started for PR 29737 at commit |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #134054 has finished for PR 29737 at commit
|
Retest this please. |
Thank you, @peter-toth and all . Merged to master. |
Test build #134070 has finished for PR 29737 at commit
|
Thanks @dongjoon-hyun and all for the review. |
…a matching ### What changes were proposed in this pull request? Provide the (configurable) ability to perform Avro-to-Catalyst schema field matching using the position of the fields instead of their names. A new `option` is added for the Avro datasource, `positionalFieldMatching`, which instructs `AvroSerializer`/`AvroDeserializer` to perform positional field matching instead of matching by name. ### Why are the changes needed? This by-name matching is somewhat recent; prior to PR #24635, at least on the write path, schemas were matched by positionally ("structural" comparison). While by-name is better behavior as a default, it will be better to make this configurable by a user. Even at the time that PR #24635 was handled, there was [interest in making this behavior configurable](#24635 (comment)), but it appears it went unaddressed. There is precedence for configurability of this behavior as seen in PR #29737, which added this support for ORC. Besides this precedence, the behavior of Hive is to perform matching positionally ([ref](https://cwiki.apache.org/confluence/display/Hive/AvroSerDe#AvroSerDe-WritingtablestoAvrofiles)), so this is behavior that Hadoop/Hive ecosystem users are familiar with. ### Does this PR introduce _any_ user-facing change? Yes, a new option is provided for the Avro datasource, `positionalFieldMatching`, which provides compatibility with Hive and pre-3.0.0 Spark behavior. ### How was this patch tested? New unit tests are added within `AvroSuite`, `AvroSchemaHelperSuite`, and `AvroSerdeSuite`; and most of the existing tests within `AvroSerdeSuite` are adapted to perform the same test using by-name and positional matching to ensure feature parity. Closes #31490 from xkrogen/xkrogen-SPARK-34365-avro-positional-field-matching. Authored-by: Erik Krogen <xkrogen@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
What changes were proposed in this pull request?
Add support for
orc.force.positional.evolution
config that forces ORC top level column matching by position rather than by name.This does work in Hive:
The orc file in this case contains the original
c1
andc2
columns that doesn't match the metadata in HMS. But due to the positional evolution setting, Hive is capable to return all the data:Without this PR Spark returns
null
s for the renamedc3
column.After this PR Spark returns the data in
c3
column.Why are the changes needed?
Hive/ORC does support it.
Does this PR introduce any user-facing change?
Yes, we will support
orc.force.positional.evolution
.How was this patch tested?
New UT.