-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing #29316
Conversation
In this pr(SPARK-19129 #16583), Maybe we can check this behavior in advance to avoid task execution? |
@cloud-fan If you have time, help review this pull request. Thank you. |
ok to test |
*/ | ||
protected def requireNonEmptyValueInPartitionSpec(specs: Seq[TablePartitionSpec]): Unit = { | ||
specs.foreach { s => | ||
if (s.values.exists(_.isEmpty)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be done in an analyzer rule? how do we do the same check for file source tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I move it to PreprocessTableInsertion
rule.
Test build #128405 has finished for PR 29316 at commit
|
withTable("t1") { | ||
spark.sql( | ||
""" | ||
|CREATE TABLE t1 (c1 int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a problem only for hive tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InsertIntoHadoopFsRelationCommand
When manageFilesourcePartitions
is turned on,catalog.listPartitions
is called, here is a check to see if the partition value is empty.
In the case that manageFilesourcePartitions
is not turned on, the partition value is currently not checked, which means that the SQL execution will not fail. If I now move the check logic to the PreprocessTableInsertion rule, this will cause the execution to fail.
Perhaps this check can only be performed when tracksPartitionsInCatalog
is equal to true and the static partition is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive calls getPartition
when loadPartition
, here it will check whether the partition value is empty.
public Partition getPartition(...){
|| (val != null && val.length() == 0)) {
throw new HiveException("get partition: Value for key "
+ field.getName() + " is null or empty");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case that manageFilesourcePartitions is not turned on, the partition value is currently not checked, which means that the SQL execution will not fail.
what's the behavior then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InsertIntoHadoopFsRelationCommand
When writing static partition or dynamic partition, the DynamicPartitionDataWriter
will be used, and the partition value of empty will generate the default value(HIVE_DEFAULT_PARTITION) through getPartitionPathString
.
When manageFilesourcePartitions
is not turned on, the partition information is maintained through the filesystem, so it is not checked whether the partition value is empty.
insert ovwriter table a partition(d='') select 1
sql will run successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the behavior of hive? Does hive treat d=''
as d=HIVE_DEFAULT_PARTITION
and can run it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> INSERT OVERWRITE TABLE t1 PARTITION(d='') select 1;
Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:61 Partition not found ''''
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer$tableSpec.<init>(BaseSemanticAnalyzer.java:856)
at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer$tableSpec.<init>(BaseSemanticAnalyzer.java:727)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1652)
... 23 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: get partition: Value for key d is null or empty
at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1900)
hive> INSERT OVERWRITE TABLE t1 PARTITION(d) select 1,'' as d;
result:
Loading data to table x.t1 partition (d=null)
Time taken for load dynamic partitions : 302
Loading partition {d=__HIVE_DEFAULT_PARTITION__}
Time taken for adding to write entity : 1
Partition x.t1{d=__HIVE_DEFAULT_PARTITION__} stats: [numFiles=1, numRows=1, totalSize=201, rawDataSize=85]
Test build #128416 has finished for PR 29316 at commit
|
Test build #128415 has finished for PR 29316 at commit
|
Test build #128442 has finished for PR 29316 at commit
|
@@ -402,6 +403,22 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { | |||
s"including ${staticPartCols.size} partition column(s) having constant value(s).") | |||
} | |||
|
|||
val partitionsTrackedByCatalog = conf.manageFilesourcePartitions && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't check the config. The flag is decided by the config that was used to create this table, so catalogTable.get.tracksPartitionsInCatalog
is good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manageFilesourcePartitions==false && tracksPartitionsInCatalog=true
InsertIntoHadoopFsRelationCommand
does not call catalog.listPartitions
and AlterTableAddPartitionCommand
, so it does not check the partition value is empty.
set spark.sql.hive.manageFilesourcePartitions=false;
insert ovwriter table a partition(d='1',h='') select 1;
currently this sql execution will not fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, conf.manageFilesourcePartitions
is used to create the table. When we need to read/alter table, we should respect table.tracksPartitionsInCatalog
.
// check static partition | ||
if (partitionsTrackedByCatalog && | ||
normalizedPartSpec.nonEmpty && | ||
staticPartCols.size == partColNames.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this check staticPartCols.size == partColNames.size
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
staticPartCols
only has fields that specify the partition value
partColNames
are all partition fields
Because it only checks the writing to the static partition
insert ovwriter table a partition(d='1',h) select 1,''
staticPartCols=[d]
partColNames=[d,h]
insert ovwriter table a partition(d='1',h='') select 1
staticPartCols=[d,h]
partColNames=[d,h]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition(d='',h)
should we fail for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark/Hive allows the partition value to be empty for dynamic partition write.
Loading data to table x.t1 partition (d=1, h=null)
Time taken for load dynamic partitions : 203
Loading partition {d=1, h=__HIVE_DEFAULT_PARTITION__}
Time taken for adding to write entity : 1
Partition x.t1{d=1, h=__HIVE_DEFAULT_PARTITION__} stats: [numFiles=1, numRows=1, totalSize=201, rawDataSize=85]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your example is partition(d='1',h)
, I was asking about partition(d='',h)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> insert overwrite table t1 partition (d='',h) select 1,'';
ERROR ql.Driver (SessionState.java:printError(956)) - FAILED: IllegalArgumentException Can not create a Path from an empty string
java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:127)
at org.apache.hadoop.fs.Path.<init>(Path.java:135)
at org.apache.hadoop.fs.Path.<init>(Path.java:94)
at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.genFileSinkPlan(SemanticAnalyzer.java:6070)
spark-sql> insert overwrite table t1 partition (d='',h) select 1,'';
[main] INFO InsertIntoHiveTable: Partition `x`.`t1` {d=__HIVE_DEFAULT_PARTITION__, h=__HIVE_DEFAULT_PARTITION__} stats: [numFiles=1, numRows=0, totalSize=212]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then shall we check all staticPartCols
have partition values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can do this check, but it will cause errors in such SQL execution, because it was successful before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the behavior is inconsistent. It's weird to see partition(d='',h)
works, but partition(d='',h='1')
does not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense.
Test build #128531 has finished for PR 29316 at commit
|
Test build #128549 has finished for PR 29316 at commit
|
Test build #128560 has finished for PR 29316 at commit
|
catalogTable.get.partitionColumnNames.nonEmpty && | ||
catalogTable.get.tracksPartitionsInCatalog | ||
if (partitionsTrackedByCatalog && | ||
normalizedPartSpec.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the if condition can be put in one line
|
||
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") | ||
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") | ||
sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the partition column can be empty string if it's dynamic. Shall we convert the empty string/null in partition spec to __HIVE_DEFAULT_PARTITION__
before calling listPartitions
/loadPartition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking, it is meaningless for the partition value to be empty, so the static partition value is not allowed to be empty.
Dynamic partition may be that the user does not know that the partition field is null or empty, and finally wrote the __HIVE_DEFAULT_PARTITION__
partition.
listPartitions
spark-sql> show partitions inserttable ;
part1=1/part2=__HIVE_DEFAULT_PARTITION__
Time taken: 0.2 seconds, Fetched 1 row(s)
spark-sql> desc formatted inserttable partition(part1='1',part2='');
Error in query: Partition spec is invalid. The spec ([part1=1, part2=]) contains an empty partition column value;
spark-sql> desc formatted inserttable partition(part1='1',part2='__HIVE_DEFAULT_PARTITION__');
col_name data_type comment
...
Time taken: 0.348 seconds, Fetched 27 row(s)
The partition value the user sees is __HIVE_DEFAULT_PARTITION__
, so the user will not specify the partition value empty to query the partition details.
loadPartition
Because in DynamicPartitionDataWriter#partitionPathExpression
, the partition value will be null or emtpy converted to __HIVE_DEFAULT_PARTITION__
, so it can be executed successfully without the need to increase early conversion.
Test build #128720 has finished for PR 29316 at commit
|
Test build #128735 has finished for PR 29316 at commit
|
Test build #128755 has finished for PR 29316 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
Write to static partition, check in advance that the partition field is empty.
Why are the changes needed?
When writing to the current static partition, the partition field is empty, and an error will be reported when all tasks are completed.
Does this PR introduce any user-facing change?
No
How was this patch tested?
add ut