Spark: Implement copy-on-write DELETE#3661
Conversation
| val table = RowLevelOperationTable(tbl, operation) | ||
| val rewritePlan = operation match { | ||
| case _: SupportsDelta => | ||
| throw new AnalysisException("Delta operations are currently not supported") |
There was a problem hiding this comment.
Merge-on-read will come in following PRs.
| /** | ||
| * A rule similar to ReplaceNullWithFalseInPredicate in Spark but applies to Iceberg row-level commands. | ||
| */ | ||
| object ExtendedReplaceNullWithFalseInPredicate extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Ideally, we would just cover deletes in the original rule but we cannot do that. Our tests fail without this.
There was a problem hiding this comment.
Needed for metadata deletes.
| /** | ||
| * A rule similar to SimplifyConditionalsInPredicate in Spark but applies to Iceberg row-level commands. | ||
| */ | ||
| object ExtendedSimplifyConditionalsInPredicate extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Same story like with ExtendedReplaceNullWithFalseInPredicate.
There was a problem hiding this comment.
Is it possible to let the original rules simplify the parsed DeleteFromTable and only convert to DeleteFromIcebergTable after this has run? Something like this:
parsedPlan match {
case DeleteFromTable(UnresolvedIcebergTable(aliasedTable), Some(condition)) if isSimplified(cond) =>
DeleteFromIcebergTable(aliasedTable, condition)
case _ =>
parsedPlan
}
...
private isSimplified(cond: Expression): Boolean = {
!_.containsAnyPattern(CASE_WHEN, IF, NULL_LITERAL, TRUE_OR_FALSE_LITERAL, INSET)
}There was a problem hiding this comment.
Ah, nevermind. I see now that the conversion to Iceberg plan actually comes immediately after parsing.
There was a problem hiding this comment.
We could probably construct a fake plan with a local relation and a filter and run it through the optimizer but I am not sure it is cleaner.
| delegate.parsePlan(sqlText) | ||
| val parsedPlan = delegate.parsePlan(sqlText) | ||
| parsedPlan match { | ||
| case DeleteFromTable(UnresolvedIcebergTable(aliasedTable), condition) => |
There was a problem hiding this comment.
This is a trick I had to do as we now rewrite in the analyzer. Moreover, existing analyzer rules in Spark break our logic (e.g. column resolution for updates). That's why we parse using Spark and then replace immediately with a custom node.
| * DataSourceV2ScanRelation depending on whether the planning has already happened; | ||
| * - the current rewrite plan. | ||
| */ | ||
| object RewrittenRowLevelCommand { |
There was a problem hiding this comment.
I am not a big fan of this part so ideas are welcome.
|
|
||
| // a node similar to V2WriteCommand in Spark but does not extend Command | ||
| // as ReplaceData and WriteDelta that extend this trait are nested within other commands | ||
| trait V2WriteCommandLike extends UnaryNode { |
There was a problem hiding this comment.
Ideas/thoughts are welcome.
| val rowSchema = StructType.fromAttributes(rd.dataInput) | ||
| val writeBuilder = newWriteBuilder(r.table, rowSchema, Map.empty) | ||
| val write = writeBuilder.build() | ||
| // TODO: detect when query contains a shuffle and insert a round-robin repartitioning |
There was a problem hiding this comment.
This is probably something we will have to address before it is merged.
There was a problem hiding this comment.
Do you mean contains a sort?
There was a problem hiding this comment.
I mean we should probably check if the query contains a shuffle if the write asks for range partitioning. Recomputing the query may be expensive during the skew estimation step.
There was a problem hiding this comment.
I've spent some time thinking and it does not look like there is a guarantee an extra round-robin repartition would improve the performance. In some cases, it may even make things worse. Here are my thoughts.
- With round-robin, we won’t have to evaluate a condition or join predicate twice. In MERGE, we won’t have to evaluate the join condition and won’t have to merge the rows. (GOOD)
- With round-robin, we won't read the data twice from the storage. If needed, we fetch shuffle data. (GOOD)
- With round-robin, we will have an extra shuffle (both write and read). Shuffles are expensive and tend to behave poorly at scale. (BAD)
- With round-robin, we have extra complexity that may not be always beneficial for the query performance. In some cases, this may degrade the performance or use more resources. (BAD)
At this point, I am inclined to skip inserting an extra round-robin repartition as even merging the rows should not be that bad considering that we operate on InternalRow. I've spent so much time tuning shuffles recently so that I am a bit skeptical it will help here.
There was a problem hiding this comment.
I'm inclined to leave it out. If we need it, we can add it later. It is also something that Spark users are already familiar with adding manually because there are many situations that require one. Just writing a clustered data set to a table by adding a global sort commonly requires manually adding a repartition.
There was a problem hiding this comment.
Okay, then we both agree to leave it out for now and reconsider later. I'll remove the TODO note.
| import org.apache.spark.sql.execution.datasources.DataSourceStrategy | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { |
There was a problem hiding this comment.
Scan planning changed so drastically before the 3.2 release so that it was easier to put this into a separate rule.
...ns/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandPruning.scala
Outdated
Show resolved
Hide resolved
| while (barrier.get() < numOperations * 2) { | ||
| sleep(10); | ||
| } | ||
| sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName); |
There was a problem hiding this comment.
I had to modify as these became metadata deletes.
| assertEquals("Should have expected rows", | ||
| ImmutableList.of(row(2, "hardware"), row(null, "hr")), | ||
| sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", tableName)); | ||
| // TODO: Spark does not support AQE and DPP with aggregates at the moment |
There was a problem hiding this comment.
That's an issue I don't have a solution for.
It will be pretty tough (if even possible) to overcome (without modifying Spark).
There was a problem hiding this comment.
What happens when adaptive execution is not disabled?
There was a problem hiding this comment.
It leads to a runtime exception.
class org.apache.spark.sql.catalyst.plans.logical.Aggregate cannot be cast to class org.apache.spark.sql.execution.SparkPlan
I did debug it at some point. If I remember correctly, it was related to the fact that dynamic subqueries in Spark cannot have other subqueries or aggregates.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Show resolved
Hide resolved
...xtensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
...xtensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
| originalTable: NamedRelation, | ||
| write: Option[Write] = None) extends V2WriteCommandLike { | ||
|
|
||
| override lazy val resolved: Boolean = table.resolved && query.resolved && outputResolved |
There was a problem hiding this comment.
This and outputResolved look like they are from the original V2WriteCommand. Why did you add them here instead of to V2WriteCommandLike?
There was a problem hiding this comment.
I can move the resolved definition to the parent trait but the implementation of outputResolved will be different in each command and won't be the same as in V2WriteCommand.
| override lazy val stringArgs: Iterator[Any] = Iterator(table, query, write) | ||
|
|
||
| // the incoming query may include metadata columns | ||
| lazy val dataInput: Seq[Attribute] = { |
There was a problem hiding this comment.
Can you give me a little more context on this?
There was a problem hiding this comment.
This node has custom resolution as the input query may have more columns than the table. For example, below the query contains id#249, dep#250, _file#254, _pos#255L.
+- ReplaceData RelationV2[id#249, dep#250] testhive.default.table
+- Filter NOT ((id#249 IN (list#248 []) AND (dep#250 = hr)) <=> true)
: +- Project [(value#19 + 2) AS (value + 2)#251]
: +- SubqueryAlias deleted_id
: +- View (`deleted_id`, [value#19])
: +- LocalRelation [value#19]
+- RelationV2[id#249, dep#250, _file#254, _pos#255L] testhive.default.table
I thought I was projecting away the metadata columns in the writer but I am no longer sure. I'll have to check.
There was a problem hiding this comment.
Yeah, I do have a projection in ExtendedV2Writes.
There was a problem hiding this comment.
If there is a projection before the replace data, then we don't need to modify this?
There was a problem hiding this comment.
The projection is inserted only after the write is built as the extra columns can be part of the distribution and ordering requested by the write. That happens in the optimizer. Before that, we need to mark this as fully resolved.
There was a problem hiding this comment.
In other words, we can discard the metadata columns only after the write has been built in the optimizer. That's why I added a projection in ExtendedV2Writes. Since the rewrite happens in the analyzer, ReplaceData must be considered resolved as long as the data columns are aligned. That's why this node has custom resolution.
...xtensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
Outdated
Show resolved
Hide resolved
...xtensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
Outdated
Show resolved
Hide resolved
.../scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala
Outdated
Show resolved
Hide resolved
...ensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala
Outdated
Show resolved
Hide resolved
| o.copy(write = Some(write), query = newQuery) | ||
|
|
||
| case rd @ ReplaceData(r: DataSourceV2Relation, query, _, None) => | ||
| val rowSchema = StructType.fromAttributes(rd.dataInput) |
There was a problem hiding this comment.
Is the purpose of overriding dataInput just to keep the original row schema?
There was a problem hiding this comment.
Correct. The idea is to report a row schema without metadata columns.
There was a problem hiding this comment.
Why not use r.output, the write relation's schema? That's what we do for normal writes isn't it? Should we just apply the ResolveOutputRelation rule?
There was a problem hiding this comment.
We can't apply the regular resolution as described here. Spark uses query.schema for regular writes. It feels more natural to use the incoming query types to construct a row schema. I think it also gives us more precise results (e.g. actual nullability).
| // optimizer extensions | ||
| extensions.injectOptimizerRule { _ => ExtendedSimplifyConditionalsInPredicate } | ||
| extensions.injectOptimizerRule { _ => ExtendedReplaceNullWithFalseInPredicate } | ||
| extensions.injectPreCBORule { _ => OptimizeMetadataOnlyDeleteFromTable } |
There was a problem hiding this comment.
I think that this works because the pre-CBO rules are executed once, so order matters. Adding the conversion to metadata delete before replacing the plan with the ReplaceData is important. Could you add a comment that notes this?
There was a problem hiding this comment.
+1 for adding a comment about why certain rules are added and their "preconditions" and associated "co-conditions" (even if it's Spark functionality we're relying on).
It's very helpful to others who aren't as aware of Spark plan resolution and allows more people to effectively review and contribute in this area of the codebase.
Particularly here where we're relying on order of operations for something that could possibly silently fail / misbehave if accidentally changed.
There was a problem hiding this comment.
Yeah, the order matters. I'll add a comment.
It is a bit of a hack and we are also lucky dynamic rules are applied after pre-CBO.
There was a problem hiding this comment.
Added a comment.
| None | ||
| } | ||
|
|
||
| private def findScanRelation( |
There was a problem hiding this comment.
This is called findScanRelation but may return either a DataSourceV2ScanRelation or a relation that is not a *ScanRelation. Should this be findRelationForTable instead?
There was a problem hiding this comment.
I meant that the original relation in row-level commands is kind of split into "write" and "scan" relations for the same table. Then we read from the scan relation and write into the write relation.
I am still wondering what would be the best naming here. I want to refer to the relation we read from.
There was a problem hiding this comment.
One option can be to call them "read" and "write" relations to avoid using "scan".
There was a problem hiding this comment.
I like "read", but this is minor.
There was a problem hiding this comment.
I've updated to use "read".
...ns/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandPruning.scala
Outdated
Show resolved
Hide resolved
...ns/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandPruning.scala
Outdated
Show resolved
Hide resolved
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Show resolved
Hide resolved
| import ExtendedDataSourceV2Implicits._ | ||
|
|
||
| override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
| case RewrittenRowLevelCommand(command, relation: DataSourceV2Relation, rewritePlan) => |
There was a problem hiding this comment.
If I understand correctly, this works for DeleteFromTable because the filter that is being pushed here is not the filter from the rewrite plan (which is negated!) and is instead the filter from the command (command.condition).
I think that's really important to call out in a comment. That was one of the first things I was trying to understand for this review.
There was a problem hiding this comment.
Yeah, you got it correctly. That's why the scan planning is different.
I'll add a comment.
|
|
||
| rewritePlan match { | ||
| case replaceData: ReplaceData => | ||
| replaceData.table match { |
There was a problem hiding this comment.
I think it would be more clear if ReplaceData.table were called relation instead.
There was a problem hiding this comment.
I can rename. I called it table as it extended V2WriteCommand before. Looks like all v2 write commands call this a table.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
Show resolved
Hide resolved
| if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) { | ||
| LOG.debug("using table metadata to estimate table statistics"); | ||
| long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(), | ||
| long totalRecords = PropertyUtil.propertyAsLong(snapshot.summary(), |
There was a problem hiding this comment.
It is related. I just forgot to invoke it from SparkCopyOnWriteScan.
There was a problem hiding this comment.
Should have the missing part now.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
Outdated
Show resolved
Hide resolved
cc5969f to
c521b10
Compare
| assert(table.resolved && query.resolved, | ||
| "`outputResolved` can only be called when `table` and `query` are both resolved.") | ||
|
|
||
| // take into account only incoming data columns and ignore metadata columns in the query |
| } | ||
|
|
||
| @Test | ||
| public void testDefaultCopyOnWriteDeletePartitionedUnsortedTable() { |
There was a problem hiding this comment.
Does everybody agree on this one?
There was a problem hiding this comment.
Why not distribute by the partition columns?
There was a problem hiding this comment.
I'd probably default to hash in this case rather than none.
c521b10 to
8b99487
Compare
| return new SparkMergeScan( | ||
| spark, table, readConf, ignoreResiduals, | ||
| schemaWithMetadataColumns(), filterExpressions); | ||
| public Scan buildCopyOnWriteScan() { |
There was a problem hiding this comment.
I moved the construction of TableScan here. I'll do the same for regular scans in the next PR.
Merge-on-read will use the existing scan for batch queries (with runtime filtering and everything).
8b99487 to
742b59b
Compare
| rewritePlan match { | ||
| case rd: ReplaceData => | ||
| rd.table match { | ||
| case DataSourceV2Relation(table, _, _, _, _) => |
There was a problem hiding this comment.
It looks weird to pull table out here, when this was matched from rd.table. I think the rename would help, but this is minor.
There was a problem hiding this comment.
It is weird, I agree. I've restructured this place to match what Spark does in other places.
| .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) | ||
| .config("spark.sql.shuffle.partitions", "4") | ||
| .config("spark.sql.hive.metastorePartitionPruningFallbackOnException", "true") | ||
| .config(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) |
There was a problem hiding this comment.
I don't think that having a random that can cause test failures is a great idea. I'd rather set it in a few tests that need it.
There was a problem hiding this comment.
Technically, adaptive execution should have no impact. That's why it would be hard to come up with particular tests to set this. If this suite wasn't so expensive, I'd add two separate runs.
There was a problem hiding this comment.
I think it'll be fine, it's just a bit weird. How long does this suite take to run? Is it something that needs to be run with all 3 catalog types, or can we get away with just running it with one or two?
There was a problem hiding this comment.
We test with 3 catalogs and 3 different file formats so the overhead seems reasonable.
|
|
||
| Distribution expectedDistribution = Distributions.unspecified(); | ||
| SortOrder[] expectedOrdering = new SortOrder[]{}; | ||
| checkCopyOnWriteDeleteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); |
There was a problem hiding this comment.
Why doesn't this use hash distribution by file and order by file and pos? Shouldn't that be the default if there is no property specified? I think we should only use NONE if it is specifically requested. Otherwise we should try to match the original distribution and ordering.
There was a problem hiding this comment.
I matched the existing logic we had in 3.0. I guess we previously looked at the write distribution and if the table had no sort, was unpartitioned and the write distribution wasn't set, we assumed there is no reasonable distribution and ordering in the table so there was no reason trying to keep it.
There are some benefits of using hash (assuming the user distributed and ordered the data manually) so I can change the default value.
There was a problem hiding this comment.
Yeah, let's change it. I think that's better than not returning anything.
|
|
||
| if (deleteModeName != null) { | ||
| DistributionMode deleteMode = DistributionMode.fromName(deleteModeName); | ||
| if (deleteMode == RANGE && table.spec().isUnpartitioned() && table.sortOrder().isUnsorted()) { |
There was a problem hiding this comment.
If the delete mode was specifically set to RANGE, I think we should probably pass it through. I see the logic of not doing that if we're going to use no custom sort order, but I think we should sort by _file and _pos if there is no order to keep rows in the same files and original order.
If we're using _file and _pos, then RANGE would mean to rebalance records across files. That makes some sense if records were already mostly in sorted order. HASH would maintain existing file boundaries. The question for me is whether we should assume that RANGE indicates that it's okay to change file boundaries... If we had a total ordering before and deleted large chunks of records, this would make sense. But if the files/records weren't already a total ordering then it could mess up column stats. Whether this assumption holds is not clear, so I would say let's defer to the user's explicit request, since this is the delete mode.
There was a problem hiding this comment.
One reason I did not consider RANGE by _file and _pos is because distributing by _file on S3 can break the existing distribution and ordering and create more files as we potentially break partition boundaries.
There was a problem hiding this comment.
Yeah, that's true. But still, it's what the user is asking for... I'd probably go ahead and honor that request.
Either way, I think that if we use range or hash we should add _file and _pos as the sort.
There was a problem hiding this comment.
This is okay as it is. Let's not block on it. There are good arguments for using HASH.
| Expressions.sort(Expressions.column(MetadataColumns.ROW_POSITION.name()), SortDirection.ASCENDING) | ||
| }; | ||
|
|
||
| checkCopyOnWriteDeleteDistributionAndOrdering(table, expectedDistribution, expectedOrdering); |
There was a problem hiding this comment.
This is okay. I noted above that I'd probably lean toward respecting RANGE. But hash with _file and _pos seems reasonable.
| } | ||
|
|
||
| @Test | ||
| public void testRangeCopyOnWriteDeleteUnpartitionedSortedTable() { |
| } | ||
|
|
||
| @Test | ||
| public void testDefaultCopyOnWriteDeletePartitionedSortedTable() { |
There was a problem hiding this comment.
This seems like the reasonable thing to do for nearly all cases where we might use no distribution.
rdblue
left a comment
There was a problem hiding this comment.
At this point, my only objections are to the default behaviors depending on the settings for table distribution and delete distribution, along with write order. I think we should fix those in a follow-up because there's no need to hold up this commit (and the subsequent UPDATE and MERGE ones) just to hash out some details. Plus, this is big and I'd like to get the non-controversial stuff in.
I'm going to merge this. Thanks for this, @aokolnychyi! It's a huge improvement.
This PR adds support for copy-on-write DELETE commands using the new connector APIs.