Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. #45228

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

maheshk114
Copy link

@maheshk114 maheshk114 commented Feb 23, 2024

What changes were proposed in this pull request?

This PR exposes a config allowing user to migrate the shuffle data directly to external storage. Changes are made to migrate the data in multiple thread to reduce the migration time. Similarly, the reading of shuffle data from external storage is done using multiple thread. Configuration parameter is added to control the number of threads to be used for reading shuffle data.

Why are the changes needed?

Currently Spark supports migration of shuffle data to peer nodes during node decommissioning. If peer nodes are not accessible, then Spark falls back to external storage. User needs to provide the storage location path. There are scenarios where user may want to migrate to external storage instead of peer nodes. This may be because of unstable nodes or due to the need of aggressive scale down. So, user should be able to configure to migrate the shuffle data directly to external storage if the use case permits.

Does this PR introduce any user-facing change?

Yes, now user can enable migrating shuffle data directly to external storage and also control the number of threads used for reading shuffle data from external storage.

How was this patch tested?

New Unit tests are added.
Made sure that existing tests are passing.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Feb 23, 2024
@mridulm
Copy link
Contributor

mridulm commented Feb 24, 2024

+CC @dongjoon-hyun , @Ngone51

@maheshk114 maheshk114 force-pushed the migrateToExternalStorage branch 2 times, most recently from 722b08c to 75d7be1 Compare February 27, 2024 03:15
@abhishekd0907
Copy link
Contributor

Hi @mridulm @attilapiros , Can you please help in reviewing this PR?

@mridulm
Copy link
Contributor

mridulm commented Mar 18, 2024

+CC @dongjoon-hyun, @Ngone51

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 18, 2024

Sorry for missing pings here, @mridulm . Let me take a look during this week.

FYI, I was thinking that Apache Hadoop 3.4 and S3 Express One Zone feature was the minimum requirement of this ind of efforts because historically these kind of previous effort didn't deliver desirable benefits due to the inevitable slowness.

I'll try quickly when I have some time and share the opinion here.

@dongjoon-hyun dongjoon-hyun self-assigned this Mar 18, 2024
@dongjoon-hyun
Copy link
Member

I assigned myself not to forget.

@mridulm
Copy link
Contributor

mridulm commented Mar 18, 2024

Thanks @dongjoon-hyun :-)
(And I forgot I had pinged you before, sorry about that !)

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @maheshk114 , I have a big concern on the AS-IS PR title because it is misleading the users by ignoring the existing Apache Spark 3.2 features.

[SPARK-47141] [Core]: Support shuffle migration to external storage.

At least, as the author of the following Apache Spark 3.2 patches, I believe Apache Spark "Support Shuffle Migration to External Storage" already. WDTY, @maheshk114 ? Could you revise the PR title to narrow down to this PR's exact contribution?

@dongjoon-hyun
Copy link
Member

To @maheshk114 , could you rebase this PR to the master branch once more to test on top of Apache Hadoop 3.4.0? The following is merged to Apache Spark master branch.

@maheshk114 maheshk114 changed the title [SPARK-47141] [Core]: Support shuffle migration to external storage. [SPARK-47141] [Core]: Support enabling migration of shuffle data directly to external storage using config parameter. Mar 21, 2024
@maheshk114
Copy link
Author

To @maheshk114 , I have a big concern on the AS-IS PR title because it is misleading the users by ignoring the existing Apache Spark 3.2 features.

[SPARK-47141] [Core]: Support shuffle migration to external storage.

At least, as the author of the following Apache Spark 3.2 patches, I believe Apache Spark "Support Shuffle Migration to External Storage" already. WDTY, @maheshk114 ? Could you revise the PR title to narrow down to this PR's exact contribution?

Thanks a lot @dongjoon-hyun for looking into the PR. Yes, it make sense. I have updated the PR title to reflect the exact changes done in this PR.

@maheshk114
Copy link
Author

To @maheshk114 , could you rebase this PR to the master branch once more to test on top of Apache Hadoop 3.4.0? The following is merged to Apache Spark master branch.

done

@LuciferYang LuciferYang changed the title [SPARK-47141] [Core]: Support enabling migration of shuffle data directly to external storage using config parameter. [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. Mar 21, 2024
@dongjoon-hyun
Copy link
Member

Thank you for rebasing, @maheshk114 .

}

test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads " +
"+ 2 read from external storage") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR seems to introduce a test failure (or flakiness) at this test case. Could you take a look at the CI failure?

[info] ShuffleBlockFetcherIteratorSuite:
...
[info] - fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads + 2 read from external storage *** FAILED *** (107 milliseconds)
[info]   org.mockito.exceptions.verification.TooFewActualInvocations: blockManager.getLocalBlockData(<any>);
[info] Wanted 2 times:
[info] -> at org.apache.spark.storage.BlockManager.getLocalBlockData(BlockManager.scala:721)
[info] But was 1 time:
[info] -> at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:592)

@@ -551,6 +551,22 @@ package object config {
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ =
ConfigBuilder("spark.storage.fallbackStorage.num.threads.for.shuffle.read")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every . means a new namespace in Apache Spark configuration namespace scheme. For example, this line introduces 4 new namespaces. Please avoid using ..

  • spark.storage.fallbackStorage.num.*
  • spark.storage.fallbackStorage.num.threads.*
  • spark.storage.fallbackStorage.num.threads.for.*
  • spark.storage.fallbackStorage.num.threads.for.shuffle.*

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -354,7 +355,8 @@ private[spark] class CoarseGrainedExecutorBackend(
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
logInfo("No running tasks, all blocks migrated, stopping.")
val timeTakenMs = (System.nanoTime() - startTime) / (1000 * 1000)
logInfo(s"No running tasks, all blocks migrated in $timeTakenMs ms, stopping.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the log-related change of this file because it's irrelevant to the functionality. We can handle this separately if needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log line can be used to compare the time taken to migrate to external storage and remote node. This may be helpful to user to check the increase in migration time.

}

private def fetchShuffleBlocks(
block: FetchBlockInfo,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def stopThreadPool(conf: SparkConf): Unit = {
logInfo(s" Stopping thread pool")
if (getFallbackStorage(conf).isDefined &&
getFallbackStorage(conf).get.fetchThreadPool.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

f.seek(offset)
f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you spin off this Utils.tryWithResource related change to a new JIRA? We can merge that first.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -1558,6 +1587,7 @@ object ShuffleBlockFetcherIterator {
mapIndex: Int,
address: BlockManagerId,
size: Long,
timeTaken: Long,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask the reason why we add a new filed here? If this is only for pure logging, please revert this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not just for logging but to capture the time taken by read operations. This was helpful in comparing the time diff between read from external storage and remote node.

@@ -172,61 +172,64 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}

test("migrate shuffle data to fallback storage") {
val conf = new SparkConf(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please preserve the existing test coverage as much as possible. We had better add new feature test coverage separately while keeping the existing test coverage unchanged in order to prevent any regression.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is not removed. I have just added a new conf to avoid duplicating the same test case.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finished the first round review, @maheshk114 .

BTW, could you share some performance result of this contribution on your system? Please add them to the PR description.

@dongjoon-hyun dongjoon-hyun removed their assignment Mar 21, 2024
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#45663 is merged. Please rebase this PR to the master branch once more,
@maheshk114 .

@maheshk114
Copy link
Author

I finished the first round review, @maheshk114 .

BTW, could you share some performance result of this contribution on your system? Please add them to the PR description.

I have a internal setup with 3.4 ..will share the number on that.

val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
// Fallback to original implementation, if thread pool is not enabled.
val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maheshk114 can you please explain why this change is needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros Thanks for taking a look at this PR. In the current code, the shuffle data read from local disk and external storage is handled in same way. It first tries to read from local disk and if it fails then it tries to read from external storage(BlockManager.getLocalBlockData). In this PR I have modified this behavior. The shuffle data written to external storage will be treated as remote block. If the remote block id points to external storage, then the read will be done in a separate thread. This code change is to make sure that, if fallback storage read thread pool is configured, then the fallback storage (external storage) blocks will be processed as remote block or else it will be processed as local block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that but what I do not get why we have two separate behaviour at the read side.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was written to read the local shuffle data in the same thread to avoid any thread creation overhead. So i wanted to keep the same behavior for local read and use a thread pool for read from external storage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found where it was first introduced:
https://github.com/apache/spark/pull/30492/files#r532833817

@dongjoon-hyun can an externally stored block ever be found locally?

If not I would prefer to see a separate case for the external blocks and count them separately too.
So we would have the following block types:

  • local
  • host-local
  • push-merged-local
  • remote
  • external

@maheshk114, @dongjoon-hyun WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would be a proper fix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Do you think it can be modified this way ?

// There should be only one fallback storage thread pool per executor.
var fallbackStorage: Option[FallbackStorage] = None
def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the null checks for conf?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some unit tests scenario, the conf is not initialized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to have the fix in the unit test and not in the production code. Otherwise the next developer touching this code will be uncertain why having null checks for the config here and there and he might keep adding null checks to his new code. Moreover if the config would be optional it should be an Option but this mixed solution is a bit misleading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. Will modify accordingly.

} else {
None
}
}

def getNumReadThreads(conf: SparkConf): Int = {
val numShuffleThreads =
if (conf == null) None else conf.get(STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here:
Why the null checks for conf?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to fix UT failures.


private def fetchShuffleBlocks(
block: FetchBlockInfo,
blockManager: BlockManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why blockManager is passed if never used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my first commit, the logic was to read from local disk first and try external storage only in case of IO exception. I removed that logic but forgot to remove the blockManager. Will remove it.

@attilapiros
Copy link
Contributor

I am also concerned about the performance.

I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.

@maheshk114 WDYT?

@maheshk114
Copy link
Author

I am also concerned about the performance.

I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.

@maheshk114 WDYT?

Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.

@attilapiros
Copy link
Contributor

I am also concerned about the performance.
I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
@maheshk114 WDYT?

Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.

With a threshold you can force to do the migration to the external storage every time (or even never) so the flag won't be needed.

@maheshk114
Copy link
Author

I am also concerned about the performance.

I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.

Yes. Will modify accordingly.

@maheshk114
Copy link
Author

I am also concerned about the performance.
I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
@maheshk114 WDYT?

Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.

With a threshold you can force to do the migration to the external storage every time (or even never) so the flag won't be needed.

@attilapiros This will work for our use case, as we will be setting it either to 0% or 100%. In normal scenarios, it should be set to more than 50%. That means when more than 50% of executors are decommissioning, we can enable migrating to external storage. But as of now Spark decommissions the executor as and when the executor is found idle for more than idle timeout or for some reason cluster manager wants to decommission the node. So there is no way to find, how many total executors are going to be decommissioned in advance. With this logic, we will start migrating to peer and later when the number reaches the threshold we will start migrating to external storage. This will result into extra movement of shuffle which we want to avoid in the first place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants