-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38809][SS] Implement option to skip null values in symmetric hash implementation of stream-stream joins #36090
Conversation
…ash impl of stream-stream joins In the symmetric has join state manager, we can receive entries with null values for a key and that caused the `removeByValue` and get iterators to fail and run into the NullPointerException. This is possible if the state recovered is written from a old spark version or its corrupted on disk. Since we don't have a utility to query this state, we would like to provide a conf option to skip nulls for the symmetric hash impl in stream stream joins.
@HeartSaVioR - Please take a look. Thx |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only nits
...cala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
Outdated
Show resolved
Hide resolved
...cala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
Outdated
Show resolved
Hide resolved
cc. @viirya @xuanyuanking to see a chance to get reviews |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I'll wait for a day to see we have further review comment, and merge if there's no unresolved comment.
…ash implementation of stream-stream joins ### What changes were proposed in this pull request? In the symmetric has join state manager, we can receive entries with null values for a key and that can cause the `removeByValue` and get iterators to fail and run into the NullPointerException. This is possible if the state recovered is written from an old spark version or its corrupted on disk or due to issues with the iterators. Since we don't have a utility to query this state, we would like to provide a conf option to skip nulls for the symmetric hash implementation in stream stream joins. ### Why are the changes needed? Without these changes, if we encounter null values for stream-stream joins, the executor task will repeatedly fail with NullPointerException and will terminate the stage and eventually the query as well. This change allows the user to set a config option to continue iterating by skipping null values for symmetric hash based implementation of stream-stream joins. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests to test the new functionality by adding nulls in between and forcing the iteration/get calls with nulls in the mix and tested the behavior with the config disabled as well as enabled. Sample output: ``` [info] SymmetricHashJoinStateManagerSuite: 15:07:50.627 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] - StreamingJoinStateManager V1 - all operations (588 milliseconds) [info] - StreamingJoinStateManager V2 - all operations (251 milliseconds) 15:07:52.669 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.671 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V1 - all operations with nulls (252 milliseconds) 15:07:52.896 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.897 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V2 - all operations with nulls (221 milliseconds) 15:07:53.114 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.116 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V1 - all operations with nulls in middle (435 milliseconds) 15:07:53.549 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.551 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V2 - all operations with nulls in middle (456 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V1 - printable key of keyWithIndexToValue (390 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V2 - printable key of keyWithIndexToValue (216 milliseconds) 15:07:54.640 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 5 seconds, 714 milliseconds. [info] Total number of tests run: 8 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #36090 from anishshri-db/bfix/SPARK-38809. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 61c489e) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ash implementation of stream-stream joins ### What changes were proposed in this pull request? In the symmetric has join state manager, we can receive entries with null values for a key and that can cause the `removeByValue` and get iterators to fail and run into the NullPointerException. This is possible if the state recovered is written from an old spark version or its corrupted on disk or due to issues with the iterators. Since we don't have a utility to query this state, we would like to provide a conf option to skip nulls for the symmetric hash implementation in stream stream joins. ### Why are the changes needed? Without these changes, if we encounter null values for stream-stream joins, the executor task will repeatedly fail with NullPointerException and will terminate the stage and eventually the query as well. This change allows the user to set a config option to continue iterating by skipping null values for symmetric hash based implementation of stream-stream joins. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests to test the new functionality by adding nulls in between and forcing the iteration/get calls with nulls in the mix and tested the behavior with the config disabled as well as enabled. Sample output: ``` [info] SymmetricHashJoinStateManagerSuite: 15:07:50.627 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] - StreamingJoinStateManager V1 - all operations (588 milliseconds) [info] - StreamingJoinStateManager V2 - all operations (251 milliseconds) 15:07:52.669 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.671 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V1 - all operations with nulls (252 milliseconds) 15:07:52.896 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.897 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V2 - all operations with nulls (221 milliseconds) 15:07:53.114 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.116 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V1 - all operations with nulls in middle (435 milliseconds) 15:07:53.549 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.551 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V2 - all operations with nulls in middle (456 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V1 - printable key of keyWithIndexToValue (390 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V2 - printable key of keyWithIndexToValue (216 milliseconds) 15:07:54.640 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 5 seconds, 714 milliseconds. [info] Total number of tests run: 8 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #36090 from anishshri-db/bfix/SPARK-38809. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 61c489e) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Can one of the admins verify this patch? |
…ash implementation of stream-stream joins ### What changes were proposed in this pull request? In the symmetric has join state manager, we can receive entries with null values for a key and that can cause the `removeByValue` and get iterators to fail and run into the NullPointerException. This is possible if the state recovered is written from an old spark version or its corrupted on disk or due to issues with the iterators. Since we don't have a utility to query this state, we would like to provide a conf option to skip nulls for the symmetric hash implementation in stream stream joins. ### Why are the changes needed? Without these changes, if we encounter null values for stream-stream joins, the executor task will repeatedly fail with NullPointerException and will terminate the stage and eventually the query as well. This change allows the user to set a config option to continue iterating by skipping null values for symmetric hash based implementation of stream-stream joins. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests to test the new functionality by adding nulls in between and forcing the iteration/get calls with nulls in the mix and tested the behavior with the config disabled as well as enabled. Sample output: ``` [info] SymmetricHashJoinStateManagerSuite: 15:07:50.627 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [info] - StreamingJoinStateManager V1 - all operations (588 milliseconds) [info] - StreamingJoinStateManager V2 - all operations (251 milliseconds) 15:07:52.669 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.671 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.672 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V1 - all operations with nulls (252 milliseconds) 15:07:52.896 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=4. 15:07:52.897 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=3. 15:07:52.898 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=1 and endIndex=1. [info] - StreamingJoinStateManager V2 - all operations with nulls (221 milliseconds) 15:07:53.114 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.116 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.331 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V1 - all operations with nulls in middle (435 milliseconds) 15:07:53.549 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.551 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=5 and endIndex=6. 15:07:53.785 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: `keyWithIndexToValue` returns a null value for indices with range from startIndex=3 and endIndex=3. [info] - StreamingJoinStateManager V2 - all operations with nulls in middle (456 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V1 - printable key of keyWithIndexToValue (390 milliseconds) [info] - SPARK-35689: StreamingJoinStateManager V2 - printable key of keyWithIndexToValue (216 milliseconds) 15:07:54.640 WARN org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite, threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) ===== [info] Run completed in 5 seconds, 714 milliseconds. [info] Total number of tests run: 8 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes apache#36090 from anishshri-db/bfix/SPARK-38809. Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 61c489e) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 99e6adb) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
In the symmetric has join state manager, we can receive entries with null values for a key and that can cause the
removeByValue
and get iterators to fail and run into the NullPointerException. This is possible if the state recovered is written from an old spark version or its corrupted on disk or due to issues with the iterators. Since we don't have a utility to query this state, we would like to provide a conf option to skip nulls for the symmetric hash implementation in stream stream joins.Why are the changes needed?
Without these changes, if we encounter null values for stream-stream joins, the executor task will repeatedly fail with NullPointerException and will terminate the stage and eventually the query as well. This change allows the user to set a config option to continue iterating by skipping null values for symmetric hash based implementation of stream-stream joins.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests to test the new functionality by adding nulls in between and forcing the iteration/get calls with nulls in the mix and tested the behavior with the config disabled as well as enabled.
Sample output: