Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Split update into delete and insert for position deltas #7646

Merged
merged 1 commit into from
May 19, 2023

Conversation

aokolnychyi
Copy link
Contributor

This PR contains a subset of changes from #7637 and is required for #7633.

@github-actions github-actions bot added the spark label May 18, 2023
@@ -218,7 +218,7 @@ private static Distribution buildPositionDeleteUpdateDistribution(
}

public static SortOrder[] buildPositionDeltaOrdering(Table table, Command command) {
if (command == DELETE || command == UPDATE) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is purely to avoid test failures for now. I will rework distribution and ordering in a follow-up.

@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.SingleShotTime)
public class UpdateProjectionBenchmark {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new approach is a bit slower (cause we have an iterator instead of a projection) but I don't think it would outweigh the benefits we can achieve by having a better clustering. Also, we can optimize the new approach further by providing codegen support for the new expression and getting rid of the position delete sort. I just wanted to confirm there is no severe degradation.

      Benchmark                                             Mode  Cnt   Score   Error  Units
[OLD] UpdateProjectionBenchmark.copyOnWriteUpdate10Percent    ss    5  15.721 ± 0.409   s/op
[NEW] UpdateProjectionBenchmark.copyOnWriteUpdate10Percent    ss    5  15.728 ± 0.162   s/op

[OLD] UpdateProjectionBenchmark.copyOnWriteUpdate30Percent    ss    5  15.165 ± 0.084   s/op
[NEW] UpdateProjectionBenchmark.copyOnWriteUpdate30Percent    ss    5  15.071 ± 0.104   s/op

[OLD] UpdateProjectionBenchmark.copyOnWriteUpdate75Percent    ss    5  15.581 ± 0.198   s/op
[NEW] UpdateProjectionBenchmark.copyOnWriteUpdate75Percent    ss    5  15.437 ± 0.118   s/op

[OLD] UpdateProjectionBenchmark.mergeOnRead10Percent          ss    5   4.682 ± 0.173   s/op
[NEW] UpdateProjectionBenchmark.mergeOnRead10Percent          ss    5   4.923 ± 0.082   s/op

[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent    ss    5   9.475 ± 0.587   s/op
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate30Percent    ss    5  10.251 ± 0.968   s/op

[OLD] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent    ss    5  23.025 ± 0.135   s/op
[NEW] UpdateProjectionBenchmark.mergeOnReadUpdate75Percent    ss    5  26.260 ± 0.733   s/op

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are results for the existing benchmark for merging rows.

      Benchmark                                                                        Mode  Cnt   Score   Error  Units
[OLD] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates    ss    5  11.287 ± 0.978   s/op
[NEW] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates    ss    5  11.100 ± 0.465   s/op

[OLD] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates    ss    5  11.344 ± 0.272   s/op
[NEW] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates    ss    5  11.417 ± 1.082   s/op

[OLD] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates    ss    5  11.835 ± 0.322   s/op
[NEW] MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates    ss    5  11.887 ± 3.269   s/op

[OLD] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates    ss    5   7.817 ± 0.245   s/op
[NEW] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates    ss    5   7.106 ± 0.240   s/op

[OLD] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates    ss    5  12.440 ± 0.339   s/op
[NEW] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates    ss    5  11.662 ± 0.258   s/op

[OLD] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates    ss    5  26.052 ± 0.865   s/op
[NEW] MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates    ss    5  23.681 ± 1.110   s/op

The new approach performs a tad better for MoR MERGE. Could be related to the projection logic in the writer but it does not really matter as long as it is not worse.

}
}

private def buildMergeRowsOutput(
matchedOutputs: Seq[Seq[Expression]],
matchedOutputs: Seq[Seq[Seq[Expression]]],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matched actions (the only type that can contain UPDATE) may produce a sequence of outputs per action now (delete + insert for deltas). Unmatched actions only produce one output.

mergeRows: MergeRows,
rowAttrs: Seq[Attribute],
rowIdAttrs: Seq[Attribute],
metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {

val outputAttrs = mergeRows.output
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into parent to reuse.

@@ -67,6 +72,97 @@ trait RewriteRowLevelIcebergCommand extends RewriteRowLevelCommand {
ProjectingInternalRow(schema, projectedOrdinals)
}

protected def buildDeltaProjections(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the rule that rewrites MERGE.

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.util.truncatedString

case class UpdateRows(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to MergeRows but simpler.

private def applyProjection(
actions: Seq[(BasePredicate, Option[UnsafeProjection])],
inputRow: InternalRow): InternalRow = {
// This method is responsible for processing a input row to emit the resultant row with an
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is copied from below.

null
val projectTargetCols = createProjection(targetOutput)

val cardinalityCheck = if (performCardinalityCheck) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered using Option but I was a bit concerned how it would look like in bytecode. I'd need to use foreach on it, which has a nested if. I hope JVM would be smart enough to detect the empty method.

UnsafeProjection.create(exprs, child.output)
}

class UpdateAsDeleteAndInsertRowIterator(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put a doc somewhere in here

/**
Splits an iterator of update merge rows into delete and update rows. Each input row becomes two output rows, first the delete, then the insert.
**/

Or something like that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

} else {
processRow
private def applyMatchedActions(row: InternalRow): InternalRow = {
for (action <- matchedActions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to ignore this if it's too Scala ish but I would do something like

return matchedActions.find(row => action.cond.eval(row)).match{
   case split: Split => 
     cachedExtraRow = split.projectExtraRow(row)
     split.projectRow(row)
   case project: Project =>
      project.apply(row)
  }.orElse(null)

For these find first situations, I would also probably just keep the option and return None but that's not required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that is invoked per record so I wanted it to produce as few objects as possible and as simple bytecode as possible, hoping JIT would then make smart choices.

I do like None and find but I am paranoid it would add more calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good paranoia :) I just think it's a little less readable. I have no problem keeping it this way but you do already have benchmarks set up so you could test it... ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make a note of this and test it in parallel to working on distribution and ordering.

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, Excited to read over the distribution work next

@aokolnychyi aokolnychyi merged commit 2f61a08 into apache:master May 19, 2023
31 checks passed
@aokolnychyi
Copy link
Contributor Author

Thanks, @RussellSpitzer!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants