Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38787] [SS] Replace found value with non-null element in the remaining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins #36073

Closed
wants to merge 4 commits into from

Conversation

anishshri-db
Copy link
Contributor

@anishshri-db anishshri-db commented Apr 5, 2022

What changes were proposed in this pull request?

In stream-stream joins, for removing old state (watermark by value), we call the removeByValue function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.

Why are the changes needed?

This change fixes a bug where we were not replacing found/matching values for removeByValue when encountering nulls in the symmetric hash join code.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a unit test for this change with nulls added. Here is a sample output:

Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
-----------------------------------------------------------------------------
2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 ms
2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
Run starting. Expected test count is: 4
…
===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====

2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task started
2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@374ccb9
2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update
2022-04-01 21:12:03,525 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@1ea930eb
2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update
2022-04-01 21:12:03,559 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0].
2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0].
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -

===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====
…
2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
Run completed in 5 seconds, 908 milliseconds.
Total number of tests run: 4
Suites: completed 1, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516

…emaining list for key and remove remaining null elements from values in keyWithIndexToValue store

If we find null at end for found value at non-last index, we are currently not removing and swapping the found value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from null index + 1 to the end and then drop the last element as before.
@anishshri-db
Copy link
Contributor Author

@HeartSaVioR - Could you please review ? Thx

Copy link
Contributor

@alex-balikov alex-balikov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably clarify in the PR comment that this relates to stream-stream joins.

@anishshri-db anishshri-db changed the title [SPARK-38787] [SS] Replace found value with non-null element in the remaining list for key and remove remaining null elements from values in keyWithIndexToValue store [SPARK-38787] [SS] Replace found value with non-null element in the remaining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins Apr 5, 2022
@anishshri-db
Copy link
Contributor Author

Probably clarify in the PR comment that this relates to stream-stream joins.

Done - Updated the PR title as well as description noting that the change relates to stream-stream joins

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 except a nit. Once the nit is addressed I'm going to merge this.

@HeartSaVioR
Copy link
Contributor

GA was green in e7331c1 and the new change e9eec52 only touches the code comment.

Thanks! Merging to master/3.3/3.2.

HeartSaVioR pushed a commit that referenced this pull request Apr 6, 2022
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins

### What changes were proposed in this pull request?

In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.

### Why are the changes needed?

This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test for this change with nulls added. Here is a sample output:
```
Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
-----------------------------------------------------------------------------
2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 ms
2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
Run starting. Expected test count is: 4
…
===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====

2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task started
2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9
2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update
2022-04-01 21:12:03,525 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb
2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update
2022-04-01 21:12:03,559 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0].
2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0].
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -

===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====
…
2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
Run completed in 5 seconds, 908 milliseconds.
Total number of tests run: 4
Suites: completed 1, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516
```

Closes #36073 from anishshri-db/bfix/SPARK-38787.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 6d9bfb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR pushed a commit that referenced this pull request Apr 6, 2022
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins

### What changes were proposed in this pull request?

In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.

### Why are the changes needed?

This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test for this change with nulls added. Here is a sample output:
```
Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
-----------------------------------------------------------------------------
2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 ms
2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
Run starting. Expected test count is: 4
…
===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====

2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task started
2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9
2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update
2022-04-01 21:12:03,525 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb
2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update
2022-04-01 21:12:03,559 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0].
2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0].
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -

===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====
…
2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
Run completed in 5 seconds, 908 milliseconds.
Total number of tests run: 4
Suites: completed 1, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516
```

Closes #36073 from anishshri-db/bfix/SPARK-38787.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 6d9bfb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@HeartSaVioR
Copy link
Contributor

cc. @MaxGekk This got into Spark 3.3, since it's a part of correctness bug fix. Sorry I missed to mention the release manager.

kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins

### What changes were proposed in this pull request?

In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.

### Why are the changes needed?

This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test for this change with nulls added. Here is a sample output:
```
Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
-----------------------------------------------------------------------------
2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 ms
2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
Run starting. Expected test count is: 4
…
===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====

2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task started
2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9
2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update
2022-04-01 21:12:03,525 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb
2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update
2022-04-01 21:12:03,559 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0].
2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0].
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -

===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====
…
2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
Run completed in 5 seconds, 908 milliseconds.
Total number of tests run: 4
Suites: completed 1, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516
```

Closes apache#36073 from anishshri-db/bfix/SPARK-38787.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 6d9bfb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 6f6eb3f)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants