-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16552] [SQL] Store the Inferred Schemas into External Catalog Tables when Creating Tables #14207
Conversation
Test build #62344 has finished for PR 14207 at commit
|
This PR introduces a new concept Not sure whether this solution sounds OK to you. Let me know whether this is a right direction to resolve the issue. Thanks! |
@@ -487,6 +487,10 @@ object DDLUtils { | |||
isDatasourceTable(table.properties) | |||
} | |||
|
|||
def isSchemaInferred(table: CatalogTable): Boolean = { | |||
table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider contains
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use contains. It makes it much harder to read and understand
the return type is an option.
On Sunday, July 17, 2016, Jacek Laskowski notifications@github.com wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
#14207 (comment):@@ -487,6 +487,10 @@ object DDLUtils {
isDatasourceTable(table.properties)
}
- def isSchemaInferred(table: CatalogTable): Boolean = {
- table.properties.get(DATASOURCE_SCHEMA_TYPE) == Option(SchemaType.INFERRED.name)
Consider contains.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/apache/spark/pull/14207/files/3be0dc0b7cfd942459c598c0d35f3d67a2c020ba#r71083304,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AATvPLJxrOTgsryjrhIAFMb3v7t5vl8-ks5qWjylgaJpZM4JMzdl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! @rxin @jaceklaskowski
I will not change it because using contains
will break the Scala 2.10 compiler.
I think it is not clear what the problem this PR tries to solve is. It just says it proposes to save the inferred schema in external catalog. |
@viirya The problem it tries to resolve is from the comment of @rxin in another PR: #14148 (comment) |
Does it mean that if users do not issue refresh when the table location is changed, the schema will be wrong when the Spark is re-starting? |
The table location is not allowed to change. Right? With the changes of this PR, if the changes on the data/files (pointed by the table location) affect the table schema, they need to manually call the Before this PR, if users restart Spark or the corresponding metadata cache item is replaced, the table schema could be changed without notice. This could be a potential issue when the read and write are conducted in parallel. This undocumented behavior could complicate the Spark applications. The unexpected changes should be avoided. If the schema is changed and the table fetching is ready for new schema, users should manually issue |
@@ -270,6 +291,11 @@ case class CreateDataSourceTableAsSelectCommand( | |||
} | |||
} | |||
|
|||
case class SchemaType private(name: String) | |||
object SchemaType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we have more schema type? If not, I think a boolean flag isSchemaInferred
should be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do it.
@gatorsmile When the data/files are input by an external system, and Spark is just used to process them in batch. Does it mean that schema can be inconsistent? Or it should call refresh every time it is going to query the table? |
@viirya Schema inference is time-consuming, especially when the number of files is huge. Thus, we should avoid refreshing it every time. That is one of the major reasons why we have a metadata cache for data source tables. |
@gatorsmile Yea. I meant that as you use the stored schema without inferred schema for table, when the data/files are changed by external system (e.g., appended by a streaming system), the stored schema can be inconsistent with the actual schema of the data. |
I think this problem already exists, as we will use cached schema instead of inferring it everytime. The only difference is after reboot, this PR will still use the stored schema, and require users to refresh table manually. |
Test build #62512 has finished for PR 14207 at commit
|
Test build #62513 has finished for PR 14207 at commit
|
@@ -223,6 +223,9 @@ abstract class Catalog { | |||
* If this table is cached as an InMemoryRelation, drop the original cached version and make the | |||
* new version cached lazily. | |||
* | |||
* If the table's schema is inferred at runtime, infer the schema again and update the schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @rxin, I'm thinking of what's the main reason to allow inferring the table schema at run time. IIRC, it's mainly because we wanna save some typing when creating external data source table by SQL string, which usually have very long schema, e.g. json files.
If this is true, then the table schema is not supposed to change. If users do wanna change it, I'd argue that it's a different table, users should drop this table and create a new one. Then we don't need to make refresh table
support schema changing and thus don't need to store the DATASOURCE_SCHEMA_ISINFERRED
flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshTable shouldn't run schema inference. Only run schema inference when creating the table.
And don't make this a config flag. Just run schema inference when creating the table. For managed tables, store the schema explicitly. Users must explicitly change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin @cloud-fan I see. Will make a change
FYI, this will change the existing external behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes unfortunately I find out about this one too late. I will add it to the release notes for 2.0 that this change will come.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Test build #62552 has finished for PR 14207 at commit
|
@cloud-fan @rxin @yhuai The code is ready to review. Thanks! |
tableDesc: CatalogTable, | ||
buffer: ArrayBuffer[Row]): Unit = { | ||
if (DDLUtils.isDatasourceTable(tableDesc)) { | ||
DDLUtils.getSchemaFromTableProperties(tableDesc) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now getSchemaFromTableProperties
should never return None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all types of data source tables, we store the schema in the table properties. Thus, we should not return None; unless the table properties are modified by users using the Alter Table
command.
Sorry, forgot to update the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, the message is changed to "# Schema of this table is corrupted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make DDLUtils.getSchemaFromTableProperties
always return a schema and throw exception if it's corrupted? I think it's more consistent with the previous behaviour, i.e. throw exception if the expected schema properties doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
Test build #62574 has finished for PR 14207 at commit
|
Test build #62647 has finished for PR 14207 at commit
|
Could you please review it again? @yhuai @liancheng @rxin Thanks! |
retest this please |
Test build #62810 has finished for PR 14207 at commit
|
private def createDataSourceTable( | ||
path: File, | ||
userSpecifiedSchema: Option[String], | ||
userSpecifiedPartitionCols: Option[String]): (StructType, Seq[String]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we pass in the expected schema and partCols, and do the check in this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do. Thanks!
Test build #62914 has finished for PR 14207 at commit
|
Test build #62926 has finished for PR 14207 at commit
|
thanks, merging to master! cc @yhuai @liancheng we will address your comments in follow up PRs if you have any. |
case r: HadoopFsRelation => r.partitionSchema.fieldNames | ||
case _ => Array.empty[String] | ||
} | ||
if (userSpecifiedPartitionColumns.length > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw an exception for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I just keep the existing behavior.
To be honest, I think we should throw an exception whenever it makes sense. It sounds like the job log is not being read by most users. Will submit a follow-up PR to make it a change. Thanks!
Where is change for the following description?
|
@yhuai Forgot to change the PR description. For data source tables, the schema will not be inferred and refreshed. This is based on the comment: #14207 (comment) BTW: just updated the PR description. Sorry for that. |
…g Columns without a Given Schema ### What changes were proposed in this pull request? Address the comments by yhuai in the original PR: #14207 First, issue an exception instead of logging a warning when users specify the partitioning columns without a given schema. Second, refactor the codes a little. ### How was this patch tested? Fixed the test cases. Author: gatorsmile <gatorsmile@gmail.com> Closes #14572 from gatorsmile/followup16552.
…s between data and partition schema ## What changes were proposed in this pull request? This is a regression introduced by apache#14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. ## How was this patch tested? new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19579 from cloud-fan/bug2.
…s between data and partition schema This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #19579 from cloud-fan/bug2.
…s between data and partition schema This is a regression introduced by apache#14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. new regression test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19579 from cloud-fan/bug2.
What changes were proposed in this pull request?
Currently, in Spark SQL, the initial creation of schema can be classified into two groups. It is applicable to both Hive tables and Data Source tables:
Group A. Users specify the schema.
Case 1 CREATE TABLE AS SELECT: the schema is determined by the result schema of the SELECT clause. For example,
Case 2 CREATE TABLE: users explicitly specify the schema. For example,
Group B. Spark SQL infers the schema at runtime.
Case 3 CREATE TABLE. Users do not specify the schema but the path to the file location. For example,
Before this PR, Spark SQL does not store the inferred schema in the external catalog for the cases in Group B. When users refreshing the metadata cache, accessing the table at the first time after (re-)starting Spark, Spark SQL will infer the schema and store the info in the metadata cache for improving the performance of subsequent metadata requests. However, the runtime schema inference could cause undesirable schema changes after each reboot of Spark.
This PR is to store the inferred schema in the external catalog when creating the table.
When users intend to refresh the schema after possible changes on external files (table location), they issueFor data source tables, the schema will not be inferred and refreshed. If the schema is changed, they need to recreate a new table. For managed tables, the schema can be changed through the DDL statements.REFRESH TABLE
. Spark SQL will infer the schema again based on the previously specified table location and update/refresh the schema in the external catalog and metadata cacheIn this PR, we do not use the inferred schema to replace the user specified schema for avoiding external behavior changes . Based on the design, user-specified schemas (as described in Group A) can be changed by ALTER TABLE commands, although we do not support them now.
How was this patch tested?
TODO: add more cases to cover the changes.