-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21912][SQL] ORC/Parquet table should not create invalid column names #19124
Conversation
Test build #81391 has finished for PR 19124 at commit
|
@@ -169,6 +171,16 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable | |||
} | |||
} | |||
} | |||
|
|||
private def checkFieldName(name: String): Unit = { | |||
// ,;{}()\n\t= and space are special characters in ORC schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an exhaustive list ? eg. looks like ?
is not allowed either. Given that the underlying lib (ORC) can evolve to support / not support certain chars, its safer to rely on some method rather than coming up with a blacklist. Can you simply call TypeInfoUtils.getTypeInfoFromTypeString
or any related method which would do this check ?
Caused by: java.lang.IllegalArgumentException: Error: : expected at the position 8 of 'struct<i?:int,j:int,k:string>' but '?' is found.
at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:360)
at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.expect(TypeInfoUtils.java:331)
at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseType(TypeInfoUtils.java:483)
at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:305)
at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromTypeString(TypeInfoUtils.java:770)
at org.apache.spark.sql.hive.orc.OrcSerializer.<init>(OrcFileFormat.scala:194)
at org.apache.spark.sql.hive.orc.OrcOutputWriter.<init>(OrcFileFormat.scala:231)
at org.apache.spark.sql.hive.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:91)
...
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @tejasapatil !
That's a good idea. Right, It's not an exhaustive list. I'll update the PR.
Test build #81394 has finished for PR 19124 at commit
|
} catch { | ||
case _: IllegalArgumentException => | ||
throw new AnalysisException( | ||
s"""Attribute name "$name" contains invalid character(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Attribute
-> Column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review. Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you that column
is more accurate here. Previously, I borrowed this from ParquetSchemaConverter
withTable("orc1") { | ||
Seq(" ", "?", ",", ";", "{", "}", "(", ")", "\n", "\t", "=").foreach { name => | ||
val m = intercept[AnalysisException] { | ||
sql(s"CREATE TABLE orc1 USING ORC AS SELECT 1 `column$name`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is CTAS. How about CREATE TABLE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I'll check the code path, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be the same situation with Parquet. CREATE TABLE
passes but SELECT
raises exceptions.
scala> sql("CREATE TABLE parquet1(`a b` int) using parquet")
res1: org.apache.spark.sql.DataFrame = []
scala> sql("select * from parquet1").show
org.apache.spark.sql.AnalysisException: Attribute name "a b" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add Datasource specific operation on createDataSourceTables
for Parquet and ORC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile . I tried the following in CreateDataSourceTableCommand
. We can add a check for ParquetFileFormat
, but not for OrcFileFormat
. Should I change the PR title and scope instead?
table.provider.get.toLowerCase match {
case "parquet" =>
dataSource.schema.map(_.name).foreach(
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.checkFieldName)
case "orc" =>
dataSource.schema.map(_.name).foreach(
org.apache.spark.sql.hive.OrcRelation.checkFieldName)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try in another way.
@@ -85,6 +87,13 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |||
} | |||
} | |||
|
|||
table.provider.get.toLowerCase match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are able to check here for a normal CREATE TABLE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just covers CREATE DATA SOURCE TABLES. How about CREATE HIVE SERDE TABLES?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya. That's a good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile . I have a question. Do we have an issue on Hive SERDE table?
CREATE TABLE t(`a b` INT) USING hive OPTIONS (fileFormat 'parquet')
I thought Hive schema is preferred than Parquet schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is Hive schema? What is Parquet schema?
Could you insert rows to the table you created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. It fails.
scala> sql("set spark.sql.hive.convertMetastoreParquet=false")
res5: org.apache.spark.sql.DataFrame = [key: string, value: string]
scala> sql("INSERT INTO t VALUES(1)")
17/09/05 11:34:03 ERROR Utils: Aborting task
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.IllegalArgumentException: field ended by ';': expected ';' but got 'b' at line 1: optional int32 a b
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:123)
|
||
import org.apache.spark.sql.AnalysisException | ||
|
||
private[sql] object OrcFileFormat { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fortunately, we already have new ORC dependency.
Test build #81403 has finished for PR 19124 at commit
|
Test build #81404 has finished for PR 19124 at commit
|
Test build #81405 has finished for PR 19124 at commit
|
Hi, @gatorsmile . I can fix it in most cases, but we have the following test case.
In case of Parquet, currently, CREATE TABLE is allowed and CTAS and SELECT shows AnalysisException. How can I proceed this? |
I just updated the output answer file to show the result for review only. |
Test build #81408 has finished for PR 19124 at commit
|
@@ -23,7 +23,8 @@ CREATE TABLE showcolumn1 (col1 int, `col 2` int) USING parquet | |||
-- !query 2 schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change it to JSON
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thank you for the guide!
@@ -145,15 +146,27 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { | |||
* `PreprocessTableInsertion`. | |||
*/ | |||
object HiveAnalysis extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought HiveAnalysis is the best place to check this.
Thank you, @gatorsmile . The PR becomes much general. |
-- !query 2 schema | ||
struct<> | ||
-- !query 2 output | ||
|
||
|
||
|
||
-- !query 3 | ||
CREATE TABLE showcolumn2 (price int, qty int, year int, month int) USING parquet partitioned by (year, month) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to change this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. It's reverted.
That is normal. When we find a bug, it normally means we ignore it in more than one place. Thus, we need to check all the other code paths that could trigger it. |
@@ -85,6 +88,14 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo | |||
} | |||
} | |||
|
|||
table.provider.get.toLowerCase(Locale.ROOT) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do it in DataSourceAnalysis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Right.
@@ -83,6 +83,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable | |||
classOf[MapRedOutputFormat[_, _]]) | |||
} | |||
|
|||
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.checkFieldNames(dataSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this when we move the check to DataSourceAnalysis
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'll check this and remove it. Maybe, we can remove the similar logic from ParquetFileFormat, too.
For that, no. It's not considered yet like the other code path. |
Could this PR cover this scenario? |
Test build #81432 has finished for PR 19124 at commit
|
I created SPARK-21929 for "Support For Parquet ALTER TABLE, yes. I think I can include that here. |
@@ -206,6 +206,9 @@ case class AlterTableAddColumnsCommand( | |||
reorderedSchema.map(_.name), "in the table definition of " + table.identifier, | |||
conf.caseSensitiveAnalysis) | |||
|
|||
val newDataSchema = StructType(catalogTable.dataSchema ++ columns) | |||
DDLUtils.checkFieldNames(catalogTable.copy(schema = newDataSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this command, it's not easy to get CatalogTable
at DataSourceStrategy
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
val newSchema = catalogTable.schema.copy(fields = reorderedSchema.toArray)
SchemaUtils.checkColumnNameDuplication(
reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkFieldNames(catalogTable.copy(schema = newSchema))
catalog.alterTableSchema(table, newSchema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, actually. Excluding partition columns was intentional.
Maybe, I used a misleading PR title and description here.
So far, I checked dataSchema
only. I think partition columns are okay because they are not a part of Parquet/ORC file schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it okay to use the following?
val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema
val newDataSchema = StructType(catalogTable.dataSchema ++ columns)
SchemaUtils.checkColumnNameDuplication(
reorderedSchema.map(_.name), "in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)
DDLUtils.checkFieldNames(catalogTable.copy(schema = newDataSchema))
catalog.alterTableSchema(
table, catalogTable.schema.copy(fields = reorderedSchema.toArray))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I found that your code is better. I'll updated it like yours.
} | ||
|
||
// TODO: After SPARK-21929, we need to check ORC, too. | ||
Seq("PARQUET").foreach { source => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added only Parquet
test case due to SPARK-21929.
Test build #81435 has finished for PR 19124 at commit
|
private[sql] object OrcFileFormat { | ||
private def checkFieldName(name: String): Unit = { | ||
try { | ||
TypeDescription.fromString(s"struct<$name:int>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems being equal to call TypeDescription.parseName(name)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parseName
looks not public though .. I don't like this line too but could not think of another alternative for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right, I forgot that is java...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. I agree that it's a little urgly now.
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { | ||
ParquetSchemaConverter.checkFieldNames(table.dataSchema) | ||
} else { | ||
table.provider.get.toLowerCase(Locale.ROOT) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table.provider
could be None
in the previous versions of Spark. Thus, .get
is risky.
|
||
private[sql] def checkFieldNames(table: CatalogTable): Unit = { | ||
val serde = table.storage.serde | ||
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way is not right. Let use your previous way with a foreach loop
table.provider.foreach {
_.toLowerCase(Locale.ROOT) match {
case "hive" =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
val serde = table.storage.serde | ||
if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { | ||
OrcFileFormat.checkFieldNames(table.dataSchema) | ||
} else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have different Parquet serde. For example, parquet.hive.serde.ParquetHiveSerDe
and org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
. How about ORC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, it's only org.apache.hadoop.hive.ql.io.orc.OrcSerde
. I checked again whether Apache ORC 1.4.0 have some renamed one under hive-storage
API or not. But, it doesn't have it.
For parquet, I'll handle that too.
Oh, thank you for review, @viirya, @HyukjinKwon and @gatorsmile ! |
@@ -848,4 +851,19 @@ object DDLUtils { | |||
} | |||
} | |||
} | |||
|
|||
private[sql] def checkFieldNames(table: CatalogTable): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename this to checkDataSchemaFieldNames
.
Test build #81443 has finished for PR 19124 at commit
|
Retest this please |
Test build #81445 has finished for PR 19124 at commit
|
|
||
SchemaUtils.checkColumnNameDuplication( | ||
reorderedSchema.map(_.name), "in the table definition of " + table.identifier, | ||
conf.caseSensitiveAnalysis) | ||
DDLUtils.checkDataSchemaFieldNames(catalogTable.copy(schema = newSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newSchema
also contains partition schema
. How about partition schema? Do we have the same limits on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's okay. Inside checkDataSchemaFieldNames
, we only uses table.dataSchema
like the following.
ParquetSchemaConverter.checkFieldNames(table.dataSchema)
For the partition columns, we have been allowing the special characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add test cases and ensure the partitioning columns with special characters work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DDLSuite and HiveDDLSuite have them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR passes the above two test cases, too.
LGTM |
Thanks! Merged to master. |
@gatorsmile . Thank you for your help! This PR is almost made by you. |
Thank you for your reviewing and helping this PR, @tejasapatil , @viirya , and @HyukjinKwon , too! |
What changes were proposed in this pull request?
Currently, users meet job abortions while creating or altering ORC/Parquet tables with invalid column names. We had better prevent this by raising AnalysisException with a guide to use aliases instead like Paquet data source tables.
BEFORE
AFTER
How was this patch tested?
Pass the Jenkins with a new test case.