-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21144][SQL][BRANCH-2.2] Check column name duplication in read/write paths #18356
Conversation
cc: @gatorsmile |
@gatorsmile |
Test build #78268 has finished for PR 18356 at commit
|
} else { | ||
schema.map(_.name.toLowerCase) | ||
} | ||
checkDuplication(columnNames, "table definition of " + table.identifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you revert all the unrelated changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
To avoid any potential issue, could you revert all the unrelated changes? |
@@ -181,6 +182,10 @@ case class DataSource( | |||
throw new AnalysisException( | |||
s"Unable to infer schema for $format. It must be specified manually.") | |||
} | |||
|
|||
SchemaUtils.checkSchemaColumnNameDuplication( | |||
dataSchema, "the datasource", sparkSession.sessionState.conf.caseSensitiveAnalysis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the change we need for 2.2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it should be dataSchema + partitionSchema.
We also need to issue some meaningful error message. Users still can bypass the error by manually specify the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I'll update soon. Thanks!
Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => | ||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { | ||
withTempDir { src => | ||
Seq(1).toDF(c0).write.mode("overwrite").json(s"$src/$c1=1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @maropu .
It seems we can simply merge JSON
and Parquet
test case into one by using format(...)
instead of json()
or parquet()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For spark.read.json
,
spark.read.format('..').load
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, thanks! I'll update
@gatorsmile Is the test valid? https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala#L45 This test fails when this pr applied because |
Test build #78345 has finished for PR 18356 at commit
|
test this please |
checkColumnNameDuplication( | ||
(dataSchema ++ partitionSchema).map(_.name), | ||
"in the datasource", | ||
sparkSession.sessionState.conf.caseSensitiveAnalysis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the new RC of 2.2 is out, how about submitting the PR to the master branch using your SchemaUtils.scala
with the corresponding test cases, instead of creating a new function checkColumnNameDuplication
?
This can simplify the future back-port merging to 2.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll do that. Thanks.
val e = intercept[AnalysisException] { | ||
spark.read.format(format).load(src.toString) | ||
} | ||
assert(e.getMessage.contains("Found duplicate column(s) in the datasource: ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, could you also add another test case? Even if there exists duplicate columns between data schema and partition schema, users still can query the data by manually specifying the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll add test cases in a new pr.
Test build #78357 has finished for PR 18356 at commit
|
What changes were proposed in this pull request?
This pr fixed unexpected results when the data schema and partition schema have the duplicate columns.
This patch added code to check name duplication when reading from/writing to files. This comes from #17758 to fix this issue for v2.2.
How was this patch tested?
Created a new test suite
SchemaUtilsSuite
and added tests in existing suites:DataFrameSuite
,DDLSuite
,JDBCWriteSuite
,DataFrameReaderWriterSuite
,HiveMetastoreCatalog
, andHiveDDLSuite
.