-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21617][SQL] Store correct table metadata when altering schema in Hive metastore. #18849
Conversation
This change fixes two issues: - when loading table metadata from Hive, restore the "provider" field of CatalogTable so DS tables can be identified. - when altering a DS table in the Hive metastore, make sure to not alter the table's schema, since the DS table's schema is stored as a table property in those cases. Also added a new unit test for this issue which fails without this change.
HiveExternalCatalog.alterTableSchema takes a shortcut by modifying the raw Hive table metadata instead of the full Spark view; that means it needs to be aware of whether the table is Hive-compatible or not. For compatible tables, the current "replace the schema" code is the correct path, except that an exception in that path should result in an error, and not in retrying in a different way. For non-compatible tables, Spark should just update the table properties, and leave the schema stored in the raw table untouched. Because Spark doesn't explicitly store metadata about whether a table is Hive-compatible or not, a new property was added just to make that explicit. The code tries to detect old DS tables that don't have the property and do the right thing. These changes also uncovered a problem with the way case-sensitive DS tables were being saved to the Hive metastore; the metastore is case-insensitive, and the code was treating these tables as Hive-compatible if the data source had a Hive counterpart (e.g. for parquet). In this scenario, the schema could be corrupted when being updated from Spark if conflicting columns existed ignoring case. The change fixes this by making case-sensitive DS-tables not Hive-compatible.
This is a corrected version of #18824 after I tracked the actual failure and looked at the suggested code paths in the original review. |
// Because HiveExternalCatalog sometimes writes back "raw" tables that have not been | ||
// completely translated to Spark's view, the provider information needs to be looked | ||
// up in two places. | ||
val provider = table.provider.orElse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change would have fixed the second exception in the bug (about storing an empty schema); but the code was just ending up in that situation because of the other problems this PR is fixing. This change shouldn't be needed for the fix, but I included it for further correctness.
Test build #80270 has finished for PR 18849 at commit
|
|
||
// Detect whether this is a Hive-compatible table. | ||
val provider = rawTable.properties.get(DATASOURCE_PROVIDER) | ||
val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole check might not support all the previous versions. We change these flags multiple times. We might break the support of the table metadata created by the previous version of Spark
How about directly comparing the schemas and checks they are Hive compatible. cc @cloud-fan WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, since we still need to handle the case without the special flag for old spark versions, it makes more sense to just detect hive compatibility by comparing the row table schema and the table schema from table properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean the check below in the case _ =>
case, right?
I see that both compatible and non-compatible tables set that property, at least in 2.1, so let me see if there's an easy way to differentiate that without having to replicate all the original checks (which may be hard to do at this point).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the check to use the serde instead. The new tests pass even without the explicit check for DATASOURCE_HIVE_COMPATIBLE
when doing that, although I prefer leaving the explicit property for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also checked 2.0 and 1.6 and both seem to do the same thing (both set the provider, and both use a different serde for non-compatible tables), so the check should work for those versions too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test case for the cross-version compatibility checking? I am just afraid it might not work as expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We plan to submit a separate PR for verifying all the related cross-version issues. That needs to verify most DDL statements. You can ignore my previous comment. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too late now, already added the tests.
Test build #80358 has finished for PR 18849 at commit
|
} | ||
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)") | ||
assert(spark.table("t1").schema | ||
.equals(new StructType().add("c1", IntegerType).add("C1", StringType))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.equals
-> ==
* from the built-in ones. | ||
*/ | ||
@ExtendedHiveTest | ||
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create a separate suite for this? HiveDDLSuite.scala
is too big now.
Test build #80405 has finished for PR 18849 at commit
|
Test build #80414 has finished for PR 18849 at commit
|
@@ -1193,6 +1242,7 @@ object HiveExternalCatalog { | |||
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." | |||
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." | |||
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." | |||
val DATASOURCE_HIVE_COMPATIBLE = SPARK_SQL_PREFIX + "hive.compatibility" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use DATASOURCE_PREFIX
?
@@ -35,7 +35,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog | |||
import org.apache.spark.sql.hive.orc.OrcFileOperator | |||
import org.apache.spark.sql.hive.test.TestHiveSingleton | |||
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} | |||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | |||
import org.apache.spark.sql.internal.StaticSQLConf._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert it back?
// Because HiveExternalCatalog sometimes writes back "raw" tables that have not been | ||
// completely translated to Spark's view, the provider information needs to be looked | ||
// up in two places. | ||
val provider = table.provider.orElse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the second exception? Could you explain more? If this is fixing a different bug, could you open a new JIRA and put it in the PR title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the bug, there are two exceptions. One gets logged, the second is thrown and caused the test to fail in my 2.1-based branch.
The exception happened because alterTableSchema
is writing back the result of getRawTable
. That raw table does not have the provider set; instead, it's in the table's properties. This check looks at both places, so that other code that uses getRawTable
can properly pass this check.
As I explained in a previous comment, this doesn't happen anymore for alterTableSchema
because of the other changes. But there's still code in the catalog class that writes back tables fetched with getRawTable
, so this feels safer.
|
||
// Detect whether this is a Hive-compatible table. | ||
val provider = rawTable.properties.get(DATASOURCE_PROVIDER) | ||
val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create a separate utility function for isHiveCompatible
in HiveExternalCatalog.scala?
val rawTable = catalog.getRawTable("default", tableName) | ||
val compatibility = rawTable.properties.get(HiveExternalCatalog.DATASOURCE_HIVE_COMPATIBLE) | ||
.map(_.toBoolean).getOrElse(true) | ||
assert(hiveCompatible === compatibility) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to test whether Hive can still read the altered table schema by using
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not easy to do here. The catalog being updated is not the same as the one the spark session is using. You can potentially run queries against the 2.1 catalog in the test, but how do you insert data into the table? (You could run a Hive query for that to, but then what's the point?)
I'd argue this kind of test should be done in HiveDDLSuite
if it doesn't do it now; and if it's desired to test against multiple Hive versions, that it needs to be re-worked so it can be run against multiple Hive versions. But TestHiveSingleton
makes that really hard currently, and fixing that is way beyond the scope of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only comment here is to ensure the altered table is still readable by Hive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand, but it's really hard to write that kind of test without a serious rewrite of the tests in the hive module, so that you can have multiple SparkSession
instances.
Right now, I think the best we can achieve is "the metastore has accepted the table so the metadata looks ok", and assume that the tests performed elsewhere (e.g. HiveDDLSuite), where a proper SparkSession
exists, are enough to make sure Hive can read the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the test case coverage. We do not have such a check. Could you add them in the following test cases?
https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala#L1865-L1923
I think this PR is also trying to make Hive readable after Spark adds columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is also trying to make Hive readable after Spark adds columns.
No, that should be the case before already. This PR is just to make the existing feature work on Hive 2.1.
I really would like to avoid turning this PR into "let's fix all the Hive tests to make sure they make sense". If you'd like I can open a bug to track that, but that is not what this change is about and I'd like to keep it focused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, we can do it in a separate PR.
Test build #80423 has finished for PR 18849 at commit
|
Test build #80432 has finished for PR 18849 at commit
|
@@ -908,7 +909,13 @@ private[hive] object HiveClientImpl { | |||
} | |||
// after SPARK-19279, it is not allowed to create a hive table with an empty schema, | |||
// so here we should not add a default col schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment looks like needing to move accordingly?
// serde for the table's data source. If they match, the table is Hive-compatible. | ||
// If they don't, they're not, because of some other table property that made it | ||
// not initially Hive-compatible. | ||
HiveSerDe.sourceToSerDe(provider.get) == table.storage.serde |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a change regarding treating case-sensitive DS tables as Hive-incompatible above. Once the given table is this kind of table without the new DATASOURCE_HIVE_COMPATIBLE
property, we should treat it as Hive compatible or incompatible? Looks like for now we treat it as compatible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case-sensitive tables are weird. They're a session configuration, but IMO that config should affect compatibility, because even if you create a table that is Hive compatible initially, you could modify it later so that it's not Hive compatible anymore. Seems like the 1.2 Hive libraries would allow the broken metadata, while the 2.1 libraries complain about it.
So yes, currently when case-sensitivity is enabled you still create tables that may be Hive-compatible, and this change forces those tables to not be Hive-compatible.
As for existing tables, there's no way to know, because that data is not present anywhere in the table's metadata. (It's not after my change either, so basically you can read that table with a case-insensitive session and who knows what might happen.)
I'm ok with reverting this part since it's all a little hazy, but just wanted to point out that it's a kinda weird part of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey all, could I get a thumbs up / down on the case-sensitiveness-handling part of this change?
Test build #80463 has finished for PR 18849 at commit
|
@@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |||
// bucket specification to empty. Note that partition columns are retained, so that we can | |||
// call partition-related Hive API later. | |||
def newSparkSQLSpecificMetastoreTable(): CatalogTable = { | |||
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea if we do this from the first version. But now, for backward compatibility, we have to handle the case without this special flag at read path, then I can't see the point of having this flag.
@@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |||
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " | |||
(None, message) | |||
|
|||
case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should look at the schema instead of looking at the config. It's possible that even case sensitive config is on, the column names are all lowercased and it's still hive compatible.
My proposal: checking schema.asLowerCased == schema
, if it's false, then it's not hive compatible. We need to add StructType.asLowerCased
though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually is this a useful change? In the read path we still need to handle the case that, a hive compatible table have inconsistent schema between table properties and metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll remove this change. The write-path change you propose isn't necessary because if you have an "invalid" schema (same column name with different case), the Hive metastore will complain and the table will be stored as non-Hive-compatible.
The problem this was trying to avoid is related to the changes in alterTableSchema
; if you create a Hive-compatible table here, then later tried to update it with an invalid schema, you'd have a frankentable because the code in alterTableSchema
was wrong.
But since this change is mainly about fixing alterTableSchema
, you'll now get a proper error in that case instead of ending up with a potentially corrupted table.
Test build #80694 has finished for PR 18849 at commit
|
Ping? |
Will review it today. |
I think there is still a lot of confusion around here about what this is fixing. I see a bunch of comments related to testing the schema for compatibility. That does not work. Schema compatibility is not the issue here; the issue is whether the table was initially created as Hive-compatible or not. This is the Hive metastore, not Spark, complaining, so the Spark-side schema for non-compatible tables is pretty irrelevant. The schema by itself does not provide enough information to detect whether a table is compatible or not. Even if the schema is Hive compatible, the data source may not have a Hive counterpart, or the table might have been initially created in a case sensitive session and have conflicting column names when case is ignore, or a few other things, all of which are checked at table creation time. The same checks cannot be done later, and should not be done. If the table was non-compatible it should remain non-compatible, and vice-versa. The only thing that is needed is a way to detect that single property of the table. You cannot do that just from the schema as has been proposed a few times here. There are two options:
The only thing that exists for the second one is the serde field in the storage descriptor. Spark sets it to either Hope that clarifies things. |
If the new flag |
If the flag is set to true, then whenever an "alter table" command is executed, it will follow the "Hive compatible" path, which lets the Hive metastore decide whether the change is valid or not. So, to the best of Spark's knowledge, compatibility is maintained because Hive did not complain about it. No other table metadata (e.g. storage info) is changed by that command. |
If If we want to introduce such a flag, we also need to ensure the value is always true. That means, we need to follow what we are doing in the CREATE TABLE code path. When Hive metastore complained about it, we should also set it to |
That's the whole point of the flag and what the current changes do! It takes different paths when handling alter table depending on whether the table is compatible. So if the table was compatible, it will remain compatible (or otherwise Hive should complain about the updated table, as it does in certain cases). So I really do not understand what is it you're not understanding about the patch.
Absolutely not. If you have a Hive compatible table and you try to update its schema with something that Hive complains about, YOU SHOULD GET AN ERROR. And that's what the current patch does. You should not try to mess up the table even further. The old code was just plain broken in this regard. |
When Hive complains it, we should still let users update the Spark-native file source tables. In Spark SQL, we do our best to make the native data source tables Hive compatible. However, we should not block users just because Hive metastore complained it. This is how we behave in CREATE TABLE. If users really require reading our Spark-native data source tables from Hive, we should introduce a SQLConf or table-specific option and update the corresponding part in In addition, we should avoid introducing a flag just for fixing a specific scenario. Thus, I still think comparing the table schemas is preferred for such a fix. Could you show an example that could break it? cc @cloud-fan |
Yes, and I'm not advocating changing that. That is fine and that is correct. The problem is what to do after the table has already been created. At that point, "Hive compatibility" is already a property of the table. If you break it, you might break a Hive application that was able to read from the table before. So it's wrong, in my view, to change compatibility at that point. If that is not the point of "Hive compatibility", then there is no point in creating data source tables in a Hive compatible way to start with. Just always create them as "not Hive compatible" because then Spark is free to do whatever it wants with them. At best, you could implement the current fallback behavior, but only if it's a data source table. It is just wrong to fallback to the exception handling case for normal Hive tables. But even then, that sort of make the case for storing data source tables as Hive-compatible rather flimsy.
The flag is not for fixing this specific scenario. The flag is for checking the Hive compatibility property of the table, so that code can make the correct decisions when Hive compatibility is an issue - like it's the case for "alter table". |
For most usage scenarios of Spark native file source tables, they do not use Hive to query the tables. Thus, breaking/maintaining Hive compatibility will not affect them. Their DDL commands on the data source tables should not be blocked even if Hive metastore complains it. For Hive users who want to query Spark native file source tables, we can introduce the property like |
Alright, I give up. If you don't think it's important to maintain Hive compatibility once it's been set, and it's ok to create tables that have completely messed up metadata (from Hive's perspective) as long as they're data source tables, I'll do that. I'd rather fix the actual problem that actually happens when using Hive 2.x than keep a long discussion about what does it mean to be compatible... |
I can see the value of maintaining hive compatibility for users who use Hive/Spark SQL together. We can do it in a separate PR. We also need to change |
That's fine if you want to do it, I'm just not signing up for actually doing it. I'm more worried about Spark actually working with a 2.1 metastore, which it currently doesn't in a few scenarios. |
This avoids corrupting Hive tables, but allows data source tables to become non-Hive-compatible depending on what the user does.
Sure, I will put it in my to-do list. Thank you very much! |
Test build #80936 has finished for PR 18849 at commit
|
Test build #80935 has finished for PR 18849 at commit
|
LGTM |
Thanks! Merging to master/2.2 |
…in Hive metastore. For Hive tables, the current "replace the schema" code is the correct path, except that an exception in that path should result in an error, and not in retrying in a different way. For data source tables, Spark may generate a non-compatible Hive table; but for that to work with Hive 2.1, the detection of data source tables needs to be fixed in the Hive client, to also consider the raw tables used by code such as `alterTableSchema`. Tested with existing and added unit tests (plus internal tests with a 2.1 metastore). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #18849 from vanzin/SPARK-21617. (cherry picked from commit 84b5b16) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…in Hive metastore. For Hive tables, the current "replace the schema" code is the correct path, except that an exception in that path should result in an error, and not in retrying in a different way. For data source tables, Spark may generate a non-compatible Hive table; but for that to work with Hive 2.1, the detection of data source tables needs to be fixed in the Hive client, to also consider the raw tables used by code such as `alterTableSchema`. Tested with existing and added unit tests (plus internal tests with a 2.1 metastore). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#18849 from vanzin/SPARK-21617. (cherry picked from commit 84b5b16) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
For Hive tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.
For data source tables, Spark may generate a non-compatible Hive table;
but for that to work with Hive 2.1, the detection of data source tables needs
to be fixed in the Hive client, to also consider the raw tables used by code
such as
alterTableSchema
.Tested with existing and added unit tests (plus internal tests with a 2.1 metastore).