-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Implement merge-on-read DELETE #3763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| override protected lazy val stringArgs: Iterator[Any] = Iterator(table, query, write) | ||
|
|
||
| // TODO: validate the row ID and metadata schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems minor to me. Since we are on a tight schedule, I'd skip it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm debating this. It seems like it will always be correct, but resolution is a nice way to sanity check and fail rather than doing the wrong thing at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say that this is probably worth doing and isn't going to be a lot of work compared with the rest of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some validation.
| } | ||
|
|
||
| // a trait similar to V2ExistingTableWriteExec but supports custom write tasks | ||
| trait ExtendedV2ExistingTableWriteExec extends V2ExistingTableWriteExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly copied from Spark.
| } | ||
| } | ||
|
|
||
| trait WritingSparkTask extends Logging with Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Mostly from Spark.
| } | ||
| } | ||
|
|
||
| case class DeltaWithMetadataWritingSparkTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is custom and needs reviews.
| private final Integer splitLookback; | ||
| private final Long splitOpenFileCost; | ||
| private final TableScan scan; | ||
| private final Context ctx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of this class here but it is needed for equals and hashCode. Another option I considered was to implement equals and hashCode in all TableScan implementations. Unfortunately, we have a lot of such classes and "equal" scans in Spark are a slightly weaker concept (i.e. not every detail must be the same to consider two scans identical).
Alternatives are welcome.
|
|
||
| super(spark, table, readConf, expectedSchema, filters); | ||
|
|
||
| this.snapshotId = readConf.snapshotId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to SparkScanBuilder.
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Override | ||
| public void commit(WriterCommitMessage[] messages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requires extra attention!
| } | ||
| } | ||
|
|
||
| private static class Context implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A helper class to avoid passing a huge list of params to methods.
api/src/main/java/org/apache/iceberg/util/StructProjection.java
Outdated
Show resolved
Hide resolved
...xtensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
Outdated
Show resolved
Hide resolved
....2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/InternalRowProjection.scala
Show resolved
Hide resolved
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Outdated
Show resolved
Hide resolved
...extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteDeltaExec.scala
Outdated
Show resolved
Hide resolved
...extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteDeltaExec.scala
Outdated
Show resolved
Hide resolved
...extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteDeltaExec.scala
Outdated
Show resolved
Hide resolved
....2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/InternalRowProjection.scala
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
Show resolved
Hide resolved
...v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWriteBuilder.java
Outdated
Show resolved
Hide resolved
...v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWriteBuilder.java
Outdated
Show resolved
Hide resolved
...v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWriteBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
Show resolved
Hide resolved
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no major issues with this, although I made a few comments about minor things.
eb15f64 to
7b9507e
Compare
| }); | ||
| } | ||
|
|
||
| private static void validateSchema(Schema expectedSchema, Schema actualSchema, Boolean checkNullability, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this refactoring to slightly reduce the code duplication. Now I look at it and I am not sure it was worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, seems like just adding the context part is all you need to do.
....2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
Show resolved
Hide resolved
|
|
||
| override protected lazy val stringArgs: Iterator[Any] = Iterator(table, query, write) | ||
|
|
||
| private def operationResolved: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, what do you think of the validation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some comments in the implementation.
|
This needs another round or two. I am switching to copy-on-write MERGE for now. |
| } | ||
| } | ||
|
|
||
| private def isCompatible(projectionField: StructField, outAttr: NamedExpression): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
|
|
||
| private def rowIdAttrsResolved: Boolean = { | ||
| projections.rowIdProjection.schema.forall { field => | ||
| originalTable.resolve(Seq(field.name), conf.resolver) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this use originalTable? I thought these fields should be coming from query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it is a little bit tricky. The actual type is defined by the projection. For example, consider MERGE operations. The incoming plan will have wrong nullability for metadata and row ID columns (they will be always nullable as those columns are null for records to insert). However, we never pass row ID or metadata columns with inserts. We only pass them with updates and deletes where those columns have correct values. In other words, the projection has more precise types. The existing logic validates that whatever the projections produce satisfies the target output attributes.
That being said, you are also right that we probably need some validation that we can actually project those columns from query...
What do you think, @rdblue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The incoming fields are probably fine because they're coming from query via rowIdProjection. For the output fields, I think it makes sense to go back to what the table requested. Since the output relation, table is probably a V2Relation that is wrapping the RowLevelOperationTable, we should actually be able to recover the requested fields without using `originalTable.
I think that makes the most sense: we want to validate that the incoming fields (query or rowIdProjection) satisfy the requirements from the operation. The original table doesn't really need to be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. I’ll try to implement.
| projections.metadataProjection match { | ||
| case Some(projection) => | ||
| projection.schema.forall { field => | ||
| originalTable.metadataOutput.exists(metadataAttr => isCompatible(field, metadataAttr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Shouldn't these be looked up in query since that's what produces the row that the metadata projection wraps?
9fbf1e5 to
8125451
Compare
| }) | ||
| } | ||
|
|
||
| private def rowIdAttrsResolved: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I changed the validation a bit. Like discussed before, the intention is to validate whatever comes out of the projection satisfies the reported row ID attributes. I couldn't avoid using originalTable as the operation only tells me attribute names and I have to resolve them against something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, it may not be the final iteration. Feedback would be appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not resolve the attrs against the child query: Projection? That's where the data is coming from. So you'd be finding the row ID fields that are coming from the incoming data that will be extracted by projections.rowIdProjection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't quite get why you can't use query instead of originalTable to look up the row ID attrs and then you'd no longer need originalTable. Same with metadata attrs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we cannot use query for MERGE commands. The actual nullability is defined by the projection and may differ from the nullability of the attributes in query. Consider a MERGE plan with records to update and insert. The metadata and row ID columns will be always nullable as those columns are null for records to insert. However, we never pass row ID or metadata columns with inserts. We only pass them with updates and deletes where those columns have correct values. In other words, the projection has more precise types. The existing logic checks that whatever the projection produces satisfies the original row ID and metadata attrs.
Apart from that, we still need originalTable to refresh the cache later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, but it sounds to me like we could use query and ignore nullability in some cases. I'm more concerned about type widening that is unexpected because we're validating based on what the table produced and not what the query produced.
For cache refreshing, shouldn't we use a callback that captures the table in a closure like we do for other plans?
| } | ||
| } | ||
|
|
||
| private def metadataAttrsResolved: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, @rdblue.
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
Show resolved
Hide resolved
| private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); | ||
|
|
||
| private final TableScan scan; | ||
| private final Long snapshotId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I reverted back the original change with using Context here.
| @@ -153,8 +158,90 @@ private Schema schemaWithMetadataColumns() { | |||
|
|
|||
| @Override | |||
| public Scan build() { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, I am not entirely happy with this place but it is probably better than using a context.
8125451 to
5c39026
Compare
5c39026 to
d3588d6
Compare
| */ | ||
| public static void validateSchema(String context, Schema expectedSchema, Schema providedSchema, | ||
| boolean checkNullability, boolean checkOrdering) { | ||
| String errMsg = String.format("Provided %s schema is incompatible with expected %s schema:", context, context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need context twice? I think that "expected schema" is nearly equivalent to "expected row ID schema" if "row ID" was used the first time, "Provided row ID schema ...".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, just one is probably enough. Updated.
| .append("\n") | ||
| .append(providedSchema) | ||
| .append("\n") | ||
| .append("problems:"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I prefer capitalizing these like they were before. It looks weird to not use sentence case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We capitalized only one before so it looked inconsistent. Made both start with capital letters now.
|
|
||
| private lazy val rowProjection = projs.rowProjection.orNull | ||
| private lazy val rowIdProjection = projs.rowIdProjection | ||
| private lazy val metadataProjection = projs.metadataProjection.orNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this may throw NPE when a projection is null but the operation causes it to be accessed? Is there a better way to fail? Maybe check which ones are null and add cases like case UPDATE_OPERATION if !hasUpdateProjections => throw ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be a good idea if we think we can guarantee that the required projections will be there.
Maybe all we need instead is to catch NPE and wrap it with the projection context and operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this too but it seems such a sensitive area that gets invoked for every row so I've tried to avoid any extra actions. While try/catch does not cost much unless there is an exception, JVM may not rewrite and perform advanced optimizations on the code inside the block. And having an extra if would potentially be even worse.
I checked the code that produces these projections and it seems unlikely we can get an NPE given our tests.
| } | ||
| } | ||
|
|
||
| public DistributionMode positionDeleteDistributionMode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like some reasonable defaults to me.
|
Still looks good to me. I had a few minor comments, but overall +1. |
|
Thanks for reviewing, @rdblue! I've merged this one as the remaining open points are relatively minor and can be further discussed separately. |
This PR implements merge-on-read DELETE in Spark.
Resolves #3629.