New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26012][SQL]Null and '' values should not cause dynamic partition failure of string types #23010
Conversation
Test build #98715 has finished for PR 23010 at commit
|
retest this please |
Test build #99576 has finished for PR 23010 at commit
|
cc @cloud-fan |
The root cause is, I think we should deal with invalid partition values ahead, so that we don't need to worry about them during writing. |
@cloud-fan, Thanks for review, Do you mean we should filter out invalid partitions in sql before write? |
But we may forget to filter null values when we write sql. The following function protects this situation and writes the value of null partitions as
But DynamicPartitionDataWriter only compares whether the values in memory are the same, so a file writing error occurs. |
We should move the logic of normalizing invalid partition values before writing, instead of during writing. |
Test build #101182 has finished for PR 23010 at commit
|
@cloud-fan I hvae added a conversion before calculating partition value to convert empty values of string type to null, would you like to review it again, thanks. |
val partitionExpression = | ||
toBoundExprs(description.partitionColumns, description.allColumns).map { | ||
case e: Expression if e.dataType == StringType => | ||
Empty2Null(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do it earlier. In FileFormatWriter.write
, we sort the input RDD by partition columns, we need to normalize partition values before sorting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Thanks for review, I have moved it before sort, PartitionColumns
is retained because it is used to calculate getPartitionPath
Test build #101218 has finished for PR 23010 at commit
|
Test build #101220 has finished for PR 23010 at commit
|
Test build #101217 has finished for PR 23010 at commit
|
Test build #101232 has finished for PR 23010 at commit
|
retest this please |
Test build #101240 has finished for PR 23010 at commit
|
retest this please |
Test build #101259 has finished for PR 23010 at commit
|
@cloud-fan Would you like to review it again, thanks. |
|
" from t1").write.partitionBy("p").saveAsTable("t2") | ||
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests w/o codegen by using CodegenInterpretedPlanTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't quite understand what 'test of w/o CodeGen' means. Would you like to give an example, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its ok to do class FileFormatWriterSuite extends QueryTest with SharedSQLContext with CodegenInterpretedPlanTest
. By default, this current test check Empty2Null
with the codegen mode (Empty2Null.doGenCode
) only. Plz check the implementation of CodegenInterpretedPlanTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, modified, thanks.
@cloud-fan,sorry,I will fix this PR next week. |
@cloud-fan I have add an analyzer rule to do the empty-string-to-null for partition columns, would you like to review it again, thanks. |
Test build #104284 has finished for PR 23010 at commit
|
Test build #104285 has finished for PR 23010 at commit
|
Test build #104383 has finished for PR 23010 at commit
|
Test build #104414 has finished for PR 23010 at commit
|
Test build #104413 has finished for PR 23010 at commit
|
retest this please |
Test build #104422 has finished for PR 23010 at commit
|
retest this please |
Test build #104424 has finished for PR 23010 at commit
|
@@ -260,6 +261,75 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] | |||
} | |||
} | |||
|
|||
/** A function that converts the empty string to null for partition values. */ | |||
case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should override nullable
in this expression, to always return true.
override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v | ||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
nullSafeCodeGen(ctx, ev, c => { | ||
val setIsNull = if (nullable) s"${ev.isNull} = true" else "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not needed, if nullable is always true.
val actualQuery = updateQueryPlan(i.query, i.partitionColumns.map(_.name)) | ||
val partitionColumns = i.partitionColumns.map { col => | ||
actualQuery.output.find(a => conf.resolver(a.name, col.name)).getOrElse{ | ||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to do this. Other analyzer rules should have already checked it.
s"Unable to resolve ${col.name} given [${i.output.map(_.name).mkString(", ")}]") | ||
} | ||
} | ||
i.copy(partitionColumns = partitionColumns, query = actualQuery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to update partitionColumns
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckAnalysis
will fail if it not updated, because the attributes in partitionColumns
is old.
val actualQuery = updateQueryPlan(c.query, c.table.partitionColumnNames) | ||
c.copy(query = actualQuery) | ||
|
||
case i @ InsertIntoTable(_, partSpec, query, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to handle InsertIntoTable
and CreateTable
. They are unresolved plan nodes and we will not hit them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InsertIntoTable
and CreateTable
are added for hive table, Corresponding to InsertIntoHiveTable
and CreateHiveTableAsSelectCommand
, we can not access hive command code here, so replacing them with InsertIntoTable
and CreateTable
.
} | ||
} | ||
val partitionSet = AttributeSet(partitionColumns) | ||
var needConvert = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this. It's OK to add a dummy project, the optimizer will remove it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is added for go through test cases, if it is not added, the test of org.apache.spark.sql.hive.execution.HiveQuerySuite.SPARK-3810: PreprocessTableInsertion static partitioning support
will fail, because it checks the number of project.
partitionColumnNames: Seq[String]): LogicalPlan = { | ||
val partitionColumns = partitionColumnNames.map { name => | ||
query.output.find(a => conf.resolver(a.name, name)).getOrElse { | ||
throw new AnalysisException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is OK as a sanity check.
I just realized that we may create Another idea is in |
@cloud-fan Do you mean to move it back to |
@cloud-fan I have submit another PR(#24334) to do it in |
…ion failure of string types Dynamic partition will fail when both '' and null values are taken as dynamic partition values simultaneously. For example, the test bellow will fail before this PR: test("Null and '' values should not cause dynamic partition failure of string types") { withTable("t1", "t2") { spark.range(3).write.saveAsTable("t1") spark.sql("select id, cast(case when id = 1 then '' else null end as string) as p" + " from t1").write.partitionBy("p").saveAsTable("t2") checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) } } The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File already exists'. This PR convert the empty strings to null for partition values. This is another way for PR(apache#23010) (Please fill in changes proposed in this fix) How was this patch tested? New added test. Closes apache#24334 from eatoncys/FileFormatWriter. Authored-by: 10129659 <chen.yanshan@zte.com.cn> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Dynamic partition will fail when both '' and null values are taken as dynamic partition values simultaneously.
For example, the test bellow will fail before this PR:
test("Null and '' values should not cause dynamic partition failure of string types") {
withTable("t1", "t2") {
spark.range(3).write.saveAsTable("t1")
spark.sql("select id, cast(case when id = 1 then '' else null end as string) as p" +
" from t1").write.partitionBy("p").saveAsTable("t2")
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null)))
}
}
The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File already exists'.
This PR adds exception protection to file conflicts, renaming the file when files conflict.
(Please fill in changes proposed in this fix)
How was this patch tested?
New added test.