Skip to content

[SPARK-46653][SQL] Code-gen for full outer sort merge join output row by row#44660

Closed
zml1206 wants to merge 2 commits intoapache:masterfrom
zml1206:SPARK-46653
Closed

[SPARK-46653][SQL] Code-gen for full outer sort merge join output row by row#44660
zml1206 wants to merge 2 commits intoapache:masterfrom
zml1206:SPARK-46653

Conversation

@zml1206
Copy link
Contributor

@zml1206 zml1206 commented Jan 10, 2024

What changes were proposed in this pull request?

Be consistent with closing code-gen, update code-gen for full outer sort merge join output row by row.
For example:

val a = Seq((1, 2), (2, 3)).toDF("a", "b")
val b = Seq((2, 5), (3, 4)).toDF("a", "c")
a.join(b, Seq("a"), "fullouter")

before this pr, generated code: https://gist.github.com/zml1206/aff18fc313a7164d6f65096a97d233eb
after: https://gist.github.com/zml1206/a27350b8849951e6efac0fb6088e527f

Why are the changes needed?

Avoid oom. When code-gen for full outer sort merge join is enbaled and the parent of SortMergeJoin cannot codegen, full outer sort merge join needs to append the output of the same key to BufferedRowIterator.currentRows which type is LinkedList. If there are a large number of duplicate keys, it is likely to cause executor oom.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UT and local test.

val df1 = spark.range(10000).map(_ => ("testkey", "testvalue1")).toDF("key", "value")
val df2 = spark.range(10000).map(_ => ("testkey", "testvalue2")).toDF("key", "value")
df1.join(df2, Seq("key"), "fullouter").show()

Local mode and driver memory 1G.
Before this pr will oom.

java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.LinkedList.linkLast(LinkedList.java:146)
	at java.base/java.util.LinkedList.add(LinkedList.java:342)
	at org.apache.spark.sql.execution.BufferedRowIterator.append(BufferedRowIterator.java:73)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.smj_consumeFullOuterJoinRow_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)

After this pr is ok.

Was this patch authored or co-authored using generative AI tooling?

No.

@zml1206
Copy link
Contributor Author

zml1206 commented Jan 12, 2024

@cloud-fan Can you help take a look if you have time? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need replace for with while?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenient control, go to +1 in different places.

@zml1206 zml1206 changed the title [SPARK-46653][SQL] Code-gen for full outer sort merge join output line by line [SPARK-46653][SQL] Code-gen for full outer sort merge join output row by row Jan 16, 2024
@zml1206
Copy link
Contributor Author

zml1206 commented Jan 23, 2024

cc @cloud-fan @wankunde @ulysses-you do you have any thought about this? Thanks.

Comment on lines 1010 to 1016
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we wrap "$matchRowsInBuffer" as a function? There are too many duplicated code.

Copy link
Contributor Author

@zml1206 zml1206 Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, generated code in PR description also updated.

@wankunde
Copy link
Contributor

@zml1206
Copy link
Contributor Author

zml1206 commented Jan 25, 2024

Whenever processNext is called, the buffer will be consumed first, and then findNextJoinRows will be called to reset the index and buffer and match the new row written into the buffer. @wankunde

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 28, 2024
@github-actions github-actions bot closed this Jun 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants