-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DELETE should return the number of deleted rows #1240
Conversation
@@ -93,9 +93,10 @@ case class DeleteCommand( | |||
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to | |||
// this data source relation. | |||
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target) | |||
Seq( | |||
Row(this.metrics.getOrElse("numDeletedRows", 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Seq(Row(...))
(or better? Row(...) :: Nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaceklaskowski Can you clarify with me your view on how using the factory method defined on the Seq companion object compare with the concrete list constructor ? I would say that the factory method is preferrable from the point of view of general programming, as it can return a specific implementation of the trait depending on some properties of the input data.
I'll fix the formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can just use metrics("numDeletedRows").value
. It is initialized with a default value of 0, anyways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also cover the case of deletes at partition boundaries. Deletes at partition boundaries is a metadata operation, therefore we don't actually have any information around how many rows were deleted.
So if metrics("numRemovedFiles")
is greater than 0, and metrics("numDeletedRows")
is equal to 0, then we have reached this metadata-only delete case. in that case, let's just return -1 since we don't actually know how many rows were deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the get
method on the map, we get an option and we are kind of forced to deal with the potential absence of the value, which is more robust. Don't you think that accessing a map directly relying on the fact that same values are always present is relying on some implicit knowledge about the content of the map that is not encoded within the type system and which can also change leading to failures?
Seq(Row(this.metrics.get("numDeletedRows").map(_.value).getOrElse(0L)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, that SGTM. make sure to cover the partition boundary case, too.
core/src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
Outdated
Show resolved
Hide resolved
Can you fix the title to be "DELETE should return the number of deleted rows"? |
@@ -93,9 +93,10 @@ case class DeleteCommand( | |||
// Re-cache all cached plans(including this relation itself, if it's cached) that refer to | |||
// this data source relation. | |||
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target) | |||
Seq( | |||
Row(this.metrics.getOrElse("numDeletedRows", 0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also cover the case of deletes at partition boundaries. Deletes at partition boundaries is a metadata operation, therefore we don't actually have any information around how many rows were deleted.
So if metrics("numRemovedFiles")
is greater than 0, and metrics("numDeletedRows")
is equal to 0, then we have reached this metadata-only delete case. in that case, let's just return -1 since we don't actually know how many rows were deleted.
} | ||
|
||
Seq.empty[Row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the above line Seq(Row(...))
to here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's see if this fixes your tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't, I dig some investigation and I found out that returning that Row makes the optimizer fails because it cannot guess the schema.
I took a screenshot (I tried to add a second field to the row to see what was the cause). This loop goes into an index array out of bound exception because the same index is used to address both the row and the converters array. As you see:
- converters array has size 0
- row has size 2
It could at least check that the two have the same size, or use a .zip and a foreach instead of a while loop. I don't know what's the semantic and why a while loop was preferred to index two collections that could have a different size
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can fix this error by adding this (right before innerChildren
will work) to define the output schema for DeleteCommand
.
override val output: Seq[Attribute] = Seq(AttributeReference("numDeletedRows", LongType)())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this is very helpful. Is there some documentation around the relationship of the output attributes and the row returned by the command? Maybe we can introduce some sort of validation at the parent class to avoid errors in the catalyst optimizer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there's any documentation for this, but since this is just a spark class we're extending I don't think we should be introducing any validation (it would be outside of the delta codebase). We can check for the column "numDeletedRows"
in the resulting df in the test suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add a few additional test cases for (a) when numDeletedRows = 0
, and (b) a partition delete
} | ||
|
||
Seq.empty[Row] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can fix this error by adding this (right before innerChildren
will work) to define the output schema for DeleteCommand
.
override val output: Seq[Attribute] = Seq(AttributeReference("numDeletedRows", LongType)())
val knownDeletedRows = deletionResultFromMetrics match { | ||
case RowInFilesDeletion(deletedRows) => deletedRows | ||
case FilePruningAtPartitionBoundary => -1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using these traits and case classes is more complex than necessary (we should be able to do this in a few lines of code).
I don't see why we can't just check if numDeletedRows = 0 and numRemovedFiles > 0 and then update our result to be -1 if so. A comment here explaining that this is because it's a delete over partitions and we can't know the number of deleted rows would suffice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey I saw your slack comment. I personally feel something like this is much easier to read, especially with the nested if statements you have above. I agree with the principles you have in mind in regards to self-documenting code, but I think this specific scenario is so simple logic-wise that it's unnecessary and actually more complex to the reader.
var deletedRows = metrics("numDeletedRows").value
if (metrics("numRemovedFiles").value > 0 && deletedRows == 0) {
// this is a delete over a partition boundary which is a metadata only operation. therefore we can't know how many rows are deleted
deletedRows = -1
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you feel really strongly about this, we can look into making your current implementation more readable (it personally took me a few seconds to deduce what was going on)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are more familiar with the codebase and the context than me for sure, so if you think the scenario is simple enough so that separating interpreting metrics from results is not required, I will just remove it.
The reason why it was surprising to me is that the -1 is sort of a magic value, and I would have expected some sort of metadata to be available to tell how many files rows are deleted in each file
if(deletedRows > 0) { | ||
RowInFilesDeletion(deletedRows) | ||
} else { | ||
val deletedFiles = this.metrics.get("numRemovedFiles").map(_.value).getOrElse(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also get the metric value using just metrics("numRemovedFiles").value
and metrics("numDeletedRows").value
. This is how we do it throughout the code, and we see above that both keys are defined in createMetrics
. And as @scottsand-db mentioned the default value is 0 anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you think that accessing a map directly relying on the fact that same values are always present is relying on some implicit knowledge about the content of the map that is not encoded within the type system and which can also change leading to failures?
I think we would want to know if the content of our metrics map changed to not contain these values that we explicitly define.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@allisonport-db are there unit tests that verify that certain metrics are set by 0 to default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there's a test for this as SQLMetric
is also a spark class. We can see in the class header the default value is 0.
We can test this to some degree however by, as I've mentioned, adding a test for when numDeletedRows=0
.
Hey @edmondo1984 we'd love to resolve #1222 are you still interested in working on this PR? |
@@ -56,7 +56,7 @@ class DeleteSQLSuite extends DeleteSuiteBase with DeltaSQLCommandTest { | |||
withTempView("v") { | |||
Seq((1, 1), (0, 3), (1, 5)).toDF("key", "value").write.format("delta").saveAsTable("tab") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make sure to add tests for the following scenarios
- DELETE on partitioned tables
- full table DELETE etc.
I've submitted a PR here: #1328 @edmondo1984 want to take a look? |
Description
Resolves #1222
How was this patch tested?
The SQL test suite was extended
Does this PR introduce any user-facing changes?
The returned DataFrame from a delete should not be empty, but it will contain a single row