Skip to content

[SPARK-47702][CORE] Remove Shuffle service endpoint from the locations list when RDD block is removed form a node.#45836

Closed
maheshk114 wants to merge 2 commits intoapache:masterfrom
maheshk114:SPARK-47702
Closed

[SPARK-47702][CORE] Remove Shuffle service endpoint from the locations list when RDD block is removed form a node.#45836
maheshk114 wants to merge 2 commits intoapache:masterfrom
maheshk114:SPARK-47702

Conversation

@maheshk114
Copy link

@maheshk114 maheshk114 commented Apr 3, 2024

What changes were proposed in this pull request?

This PR fix a bug where the shuffle service end point is not removed when a RDD block is removed from a node. This is done by changing the call to reportBlockStatus in removeBlockInternal method. Currently StorageLevel.NONE is passed as storage level. This causes the driver to ignore the RDD in method updateBlockInfo as storageLevel.useDisk is set as false in StorageLevel.NONE.

Why are the changes needed?

If the block location is not updated properly, then tasks fails with fetch failed exception. The tasks will try to read the RDD blocks from a node using external shuffle service. The read will fail, if the node is already decommissioned.

WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 (TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a new UT.

Was this patch authored or co-authored using generative AI tooling?

No

…s list when RDD block is removed form a node
@github-actions github-actions bot added the CORE label Apr 3, 2024
@maheshk114
Copy link
Author

@attilapiros Can you please take a look at this PR.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 15, 2024
@github-actions github-actions bot closed this Jul 16, 2024
@attilapiros
Copy link
Contributor

@maheshk114 I am sorry I missed this PR earlier. Let me try to reopen it.

@attilapiros attilapiros reopened this Jul 31, 2024
@attilapiros
Copy link
Contributor

retest

@github-actions github-actions bot closed this Aug 1, 2024
Comment on lines +2115 to +2116
// memory size and disk size are being kept for calculating delta. Reset the replica
// count 0 in storage level to notify that its a remove operation.
Copy link
Contributor

@attilapiros attilapiros Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// memory size and disk size are being kept for calculating delta. Reset the replica
// count 0 in storage level to notify that its a remove operation.
// memory size and disk size are being kept for calculating delta but reset the replication
// factor to 0 to indicate its a remove operation.

@attilapiros
Copy link
Contributor

@maheshk114 could you please open a new PR with the same content?

@mridulm mridulm removed the Stale label Aug 1, 2024
@mridulm mridulm reopened this Aug 1, 2024
@mridulm
Copy link
Contributor

mridulm commented Aug 1, 2024

I think it was the stale tag which caused it to get closed again @attilapiros. Removed it, let us see if it stays open :-)

Comment on lines +312 to +313
val store2 = makeBlockManager(10000, "host-2")
assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need this.

val blockId = RDDBlockId(1, 2)
val message = new Array[Byte](1000)

// if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present.
// As SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled the external shuffle service should be listed

assert(master.getLocations(blockId).contains(
BlockManagerId("host-1", "localhost", shuffleServicePort, None)))

// after block is removed, shuffle port should be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// after block is removed, shuffle port should be removed.
// After block is removed the external shuffle service should be removed too from the locations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants