[SPARK-43221][CORE][3.5] Host local block fetching should use a block status of a block stored on disk#50260
Closed
attilapiros wants to merge 2 commits intoapache:branch-3.5from
Closed
[SPARK-43221][CORE][3.5] Host local block fetching should use a block status of a block stored on disk#50260attilapiros wants to merge 2 commits intoapache:branch-3.5from
attilapiros wants to merge 2 commits intoapache:branch-3.5from
Conversation
…us of a block stored on disk Thanks for yorksity who reported this error and even provided a PR for it. This solution very different from apache#40883 as `BlockManagerMasterEndpoint#getLocationsAndStatus()` needed some refactoring. ### What changes were proposed in this pull request? This PR fixes an error which can be manifested in the following exception: ``` 25/02/20 09:58:31 ERROR util.Utils: [Executor task launch worker for task 61.0 in stage 67.0 (TID 9391)]: Exception encountered java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:185) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?] at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:171) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] ``` The PR is changing `BlockManagerMasterEndpoint#getLocationsAndStatus()`. The `BlockManagerMasterEndpoint#getLocationsAndStatus()` function is giving back an optional `BlockLocationsAndStatus` which consist of 3 parts: - `locations`: all the locations where the block can be found (as a sequence of block manager IDs) - `status`: one block status - `localDirs`: optional directory paths which can be used to read block if the block is found in the disk of an executor running on the same host The block (either RDD blocks, shuffle blocks or torrent blocks) can be stored in many executors with different storage levels: disk or memory. This PR changing how the block status and the block manager ID for the `localDirs` is found to guarantee they belong together. ### Why are the changes needed? Before this PR the `BlockManagerMasterEndpoint#getLocationsAndStatus()` was searching for the block status (`status`) and the `localDirs` separately. The block status actually was computed as the very first one where the block can be found. This way it can easily happen this block status was representing an in-memory block (where the disk size is 0 as it is stored in the memory) but the `localDirs` was filled out based on a host local block instance which was stored on disk. This situation can be very frequent but only causing problems (exceptions as above) when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted block the whole file containing the block is read, see https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Host local block fetching was already covered by some existing unit tests but a new unit test is provided for this exact case: "SPARK-43221: Host local block fetching should use a block status with disk size". The number of block mangers and the order of the blocks was chosen after some experimentation as the block status order is depends on a `HashSet`, see: ``` private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] ``` This test was executed with the old code too to validate the issue is reproduced: ``` BlockManagerSuite: OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended - SPARK-43221: Host local block fetching should use a block status with disk size *** FAILED *** 0 was not greater than 0 The block size must be greater than 0 for a nonempty block! (BlockManagerSuite.scala:491) Run completed in 6 seconds, 705 milliseconds. Total number of tests run: 1 Suites: completed 1, aborted 0 Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 *** 1 TEST FAILED *** ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#50122 from attilapiros/SPARK-43221. Authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 997e599)
Member
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Could you fix the compilation failure, @attilapiros ?
[error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:865:10: value orElse is not a member of Equals
[error] possible cause: maybe a semicolon is missing before `value orElse'?
[error] .orElse {
[error] ^
[error] one error found
Contributor
Author
|
Fixed. It was a difference between Scala 2.12 vs 2.13. Although the documentation does not says differently: https://www.scala-lang.org/api/2.12.8/scala/Option.html#zip[B](that:scala.collection.GenIterable[B]):Option[(A,B)] |
dongjoon-hyun
approved these changes
Mar 13, 2025
Member
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM (Pending CIs). Thank you, @attilapiros .
dongjoon-hyun
pushed a commit
that referenced
this pull request
Mar 13, 2025
… status of a block stored on disk **This is a backport to branch-3.5 from master.** Thanks for yorksity who reported this error and even provided a PR for it. This solution very different from #40883 as `BlockManagerMasterEndpoint#getLocationsAndStatus()` needed some refactoring. ### What changes were proposed in this pull request? This PR fixes an error which can be manifested in the following exception: ``` 25/02/20 09:58:31 ERROR util.Utils: [Executor task launch worker for task 61.0 in stage 67.0 (TID 9391)]: Exception encountered java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:185) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) ~[scala-library-2.12.15.jar:?] at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?] at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:171) ~[spark-core_2.12-3.3.2.3.3.7190.5-2.jar:3.3.2.3.3.7190.5-2] ``` The PR is changing `BlockManagerMasterEndpoint#getLocationsAndStatus()`. The `BlockManagerMasterEndpoint#getLocationsAndStatus()` function is giving back an optional `BlockLocationsAndStatus` which consist of 3 parts: - `locations`: all the locations where the block can be found (as a sequence of block manager IDs) - `status`: one block status - `localDirs`: optional directory paths which can be used to read block if the block is found in the disk of an executor running on the same host The block (either RDD blocks, shuffle blocks or torrent blocks) can be stored in many executors with different storage levels: disk or memory. This PR changing how the block status and the block manager ID for the `localDirs` is found to guarantee they belong together. ### Why are the changes needed? Before this PR the `BlockManagerMasterEndpoint#getLocationsAndStatus()` was searching for the block status (`status`) and the `localDirs` separately. The block status actually was computed as the very first one where the block can be found. This way it can easily happen this block status was representing an in-memory block (where the disk size is 0 as it is stored in the memory) but the `localDirs` was filled out based on a host local block instance which was stored on disk. This situation can be very frequent but only causing problems (exceptions as above) when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted block the whole file containing the block is read, see https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Host local block fetching was already covered by some existing unit tests but a new unit test is provided for this exact case: "SPARK-43221: Host local block fetching should use a block status with disk size". The number of block mangers and the order of the blocks was chosen after some experimentation as the block status order is depends on a `HashSet`, see: ``` private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] ``` This test was executed with the old code too to validate the issue is reproduced: ``` BlockManagerSuite: OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended - SPARK-43221: Host local block fetching should use a block status with disk size *** FAILED *** 0 was not greater than 0 The block size must be greater than 0 for a nonempty block! (BlockManagerSuite.scala:491) Run completed in 6 seconds, 705 milliseconds. Total number of tests run: 1 Suites: completed 1, aborted 0 Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0 *** 1 TEST FAILED *** ``` ### Was this patch authored or co-authored using generative AI tooling? No. (cherry picked from commit 997e599) Closes #50260 from attilapiros/SPARK-43221_branch-3.5. Authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Member
|
Merged to branch-3.5. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is a backport to branch-3.5 from master.
Thanks for @yorksity who reported this error and even provided a PR for it.
This solution very different from #40883 as
BlockManagerMasterEndpoint#getLocationsAndStatus()needed some refactoring.What changes were proposed in this pull request?
This PR fixes an error which can be manifested in the following exception:
The PR is changing
BlockManagerMasterEndpoint#getLocationsAndStatus().The
BlockManagerMasterEndpoint#getLocationsAndStatus()function is giving back an optionalBlockLocationsAndStatuswhich consist of 3 parts:locations: all the locations where the block can be found (as a sequence of block manager IDs)status: one block statuslocalDirs: optional directory paths which can be used to read block if the block is found in the disk of an executor running on the same hostThe block (either RDD blocks, shuffle blocks or torrent blocks) can be stored in many executors with different storage levels: disk or memory.
This PR changing how the block status and the block manager ID for the
localDirsis found to guarantee they belong together.Why are the changes needed?
Before this PR the
BlockManagerMasterEndpoint#getLocationsAndStatus()was searching for the block status (status) and thelocalDirsseparately. The block status actually was computed as the very first one where the block can be found. This way it can easily happen this block status was representing an in-memory block (where the disk size is 0 as it is stored in the memory) but thelocalDirswas filled out based on a host local block instance which was stored on disk.This situation can be very frequent but only causing problems (exceptions as above) when encryption is on (spark.io.encryption.enabled=true) as for a not encrypted block the whole file containing the block is read, see
https://github.com/apache/spark/blob/branch-3.5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Host local block fetching was already covered by some existing unit tests but a new unit test is provided for this exact case: "SPARK-43221: Host local block fetching should use a block status with disk size".
The number of block mangers and the order of the blocks was chosen after some experimentation as the block status order is depends on a
HashSet, see:This test was executed with the old code too to validate the issue is reproduced:
Was this patch authored or co-authored using generative AI tooling?
No.
(cherry picked from commit 997e599)