Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#615] improvement: Reduce task binary by removing 'partitionToServers' from RssShuffleHandle #637

Merged
merged 11 commits into from
Mar 14, 2023

Conversation

jiafuzha
Copy link
Contributor

@jiafuzha jiafuzha commented Feb 21, 2023

What changes were proposed in this pull request?

move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.

Why are the changes needed?

to reduce task delay and task serialize/deserialize time by reduce task binary size

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
  2. tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic

…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
@codecov-commenter
Copy link

codecov-commenter commented Feb 21, 2023

Codecov Report

Merging #637 (45e62a5) into master (5882e10) will increase coverage by 2.14%.
The diff coverage is 12.50%.

@@             Coverage Diff              @@
##             master     #637      +/-   ##
============================================
+ Coverage     60.63%   62.78%   +2.14%     
  Complexity     1845     1845              
============================================
  Files           228      215      -13     
  Lines         12702    10753    -1949     
  Branches       1062     1062              
============================================
- Hits           7702     6751     -951     
+ Misses         4592     3655     -937     
+ Partials        408      347      -61     
Impacted Files Coverage Δ
...ava/org/apache/spark/shuffle/RssShuffleHandle.java 0.00% <0.00%> (ø)
...va/org/apache/spark/shuffle/ShuffleHandleInfo.java 0.00% <0.00%> (ø)
...org/apache/spark/shuffle/RssSparkShuffleUtils.java 68.26% <50.00%> (-1.12%) ⬇️

... and 15 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
@advancedxy
Copy link
Contributor

PTAL @xianjingfeng first. I will review it later tonight.

* Class for holding partition ID -> shuffle servers mapping.
* It's to be broadcast to executors and referenced by shuffle tasks.
*/
public class PartitionShuffleServerMap {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class may store other information, such as RemoteStorageInfo . I think we should use a better name. ShuffleHandlerInfo? I hope you have a better idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

public int getShuffleId() {
return shuffleId();
}

public Set<ShuffleServerInfo> getShuffleServersForData() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@xianjingfeng
Copy link
Member

It's OK for me for the basic logic. You can deal with code style and ci first. @zjf2012

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import scala.reflect.ClassTag;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the import group order doesn't seem right.

Let's try spark way:

import java

import scala

import third_party

import uniffle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let adjust it accordingly.

* @param partitionToServers
* @return Broadcast variable registered for auto cleanup
*/
public static Broadcast<PartitionShuffleServerMap> createPartShuffleServerMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm prefer to pass SparkContext as a parameter here. it's much clear and we should pass SparkContext if we are going to add unit test for this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

@advancedxy
Copy link
Contributor

The code lgtm overall. some minor questions:

How was this patch tested?
tested with 10000 partitions shuffle

Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?

And just to be safe, how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.

@jiafuzha
Copy link
Contributor Author

The code lgtm overall. some minor questions:

How was this patch tested?
tested with 10000 partitions shuffle

Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?

And just to be safe, how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.

For now, I don't have more servers. So, I only use two shuffle servers. Before my optimization, both map task and reduce task have more than 670KB binary size. After optimization, they reduce to less than 6KB. It's dramatic.

Broadcast uses bittorrent-like way to distribute variable to each executor once. Executors can get some chunk of broadcast variable from other executors instead of all from driver. And task serialize/deserialize time drops a lot. So in theory, it has no way to slow down job.

The size of the broadcast should be less than 670KB deduced from above statement. I'll try to capture it today.

@jerqi
Copy link
Contributor

jerqi commented Feb 22, 2023

Will the broadcast be cleaned when the shuffle is removed?

@jiafuzha
Copy link
Contributor Author

Will the broadcast be cleaned when the shuffle is removed?

yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.

@Override public boolean unregisterShuffle(int shuffleId) { try { if (SparkEnv.get().executorId().equals("driver")) { shuffleWriteClient.unregisterShuffle(appId, shuffleId); } } catch (Exception e) { LOG.warn("Errors on unregister to remote shuffle-servers", e); } return true; }

@jerqi
Copy link
Contributor

jerqi commented Feb 22, 2023

Will the broadcast be cleaned when the shuffle is removed?

yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.

@Override public boolean unregisterShuffle(int shuffleId) { try { if (SparkEnv.get().executorId().equals("driver")) { shuffleWriteClient.unregisterShuffle(appId, shuffleId); } } catch (Exception e) { LOG.warn("Errors on unregister to remote shuffle-servers", e); } return true; }

Ok

@jiafuzha
Copy link
Contributor Author

identified two problems,

  • size of the newly created broadcast variable is large than original task binary.
  • The broadcasted ShuffleHandleInfo is not registered to kryoserializer. It means all job will fail if someone set "spark.kryo.registerRequired" to true.

I need to serialize ShuffleHandleInfo to binary with spark closureSerialize before spark kryoserializing and broadcasting it. Working on it.

…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
@jiafuzha
Copy link
Contributor Author

The code lgtm overall. some minor questions:

How was this patch tested?
tested with 10000 partitions shuffle

Do you have some data about the serialized task before and after, is there any slowdown when using broadcast and how many shuffle servers in your environments?
And just to be safe, how much the size of broadcast is occupied when your 10000 partition test? Just to make sure it don't bring too much memory pressure to driver.

For now, I don't have more servers. So, I only use two shuffle servers. Before my optimization, both map task and reduce task have more than 670KB binary size. After optimization, they reduce to less than 6KB. It's dramatic.

Broadcast uses bittorrent-like way to distribute variable to each executor once. Executors can get some chunk of broadcast variable from other executors instead of all from driver. And task serialize/deserialize time drops a lot. So in theory, it has no way to slow down job.

The size of the broadcast should be less than 670KB deduced from above statement. I'll try to capture it today.

Before my patch, "task + partition -> shuffle servers " size in binary is about 83.6KiB. Deserialized size is about 659.8KiB.

2023-02-22 13:36:13,581 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 83.6 KiB, free 23.4 GiB)
2023-02-22 13:36:13,582 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2 took 4 ms
2023-02-22 13:36:13,583 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 659.8 KiB, free 23.4 GiB)

With my patch, the broadcasted variable size in binary is about 80.6KiB < 83.6KiB. Deserialized size is about 655KiB < 659.8KiB . They are as expected.

2023-02-23 12:43:20,015 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 80.6 KiB, free 23.4 GiB)
2023-02-23 12:43:20,015 INFO broadcast.TorrentBroadcast: Reading broadcast variable 1 took 4 ms
2023-02-23 12:43:20,017 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 655.0 KiB, free 23.4 GiB)

@jiafuzha jiafuzha requested review from advancedxy and xianjingfeng and removed request for advancedxy and xianjingfeng February 23, 2023 08:23
@jiafuzha
Copy link
Contributor Author

Please see vanished task deserialization time with patch from https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.

@jerqi
Copy link
Contributor

jerqi commented Feb 24, 2023

…m RssShuffleHandle

What changes were proposed in this pull request?

move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.

Why are the changes needed?

to reduce task delay and task serialize/deserialize time by reduce task binary size

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
  2. tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic

Could you modify the title and description? It seems that the title is too long, some words of title were added to description.

…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
@jerqi
Copy link
Contributor

jerqi commented Mar 4, 2023

@advancedxy @xianjingfeng Do you have another suggestion?

xianjingfeng
xianjingfeng previously approved these changes Mar 5, 2023
Copy link
Member

@xianjingfeng xianjingfeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@advancedxy
Copy link
Contributor

@advancedxy @xianjingfeng Do you have another suggestion?

I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.

If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either
turn off spark.kryo.registerRequired (which is explicitly set by user) or manually register RssShuffleHandle.

@jiafuzha
Copy link
Contributor Author

jiafuzha commented Mar 6, 2023

@advancedxy @xianjingfeng Do you have another suggestion?

I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.

If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off spark.kryo.registerRequired (which is explicitly set by user) or manually register RssShuffleHandle.

Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.

RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.

` RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

    partitions = stage.rdd.partitions
  }`

I think broadcast itself is quite simple and efficient. I don't see other better alternatives.

@advancedxy
Copy link
Contributor

@advancedxy @xianjingfeng Do you have another suggestion?

I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off spark.kryo.registerRequired (which is explicitly set by user) or manually register RssShuffleHandle.

Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.

RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.

` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }

    partitions = stage.rdd.partitions
  }`

I think broadcast itself is quite simple and efficient. I don't see other better alternatives.

Sorry I wasn't clear enough. I agree broadcast is the best way to go. I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once spark.kryo.registerRequired(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.

@jiafuzha
Copy link
Contributor Author

jiafuzha commented Mar 6, 2023

@advancedxy @xianjingfeng Do you have another suggestion?

I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off spark.kryo.registerRequired (which is explicitly set by user) or manually register RssShuffleHandle.

Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }

    partitions = stage.rdd.partitions
  }`

I think broadcast itself is quite simple and efficient. I don't see other better alternatives.

Sorry I wasn't clear enough. I agree broadcast is the best way to go. I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once spark.kryo.registerRequired(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.

no worries. You guys can decide which option is better. I can revert back if you like.

@advancedxy
Copy link
Contributor

@advancedxy @xianjingfeng Do you have another suggestion?

I'm not sure, this pr introduced some quite complex logic to broadcast shuffle handle info.
If I was implementing this feature, I would just use Kryo by default, and in RSSShuffleManager indicating users to either turn off spark.kryo.registerRequired (which is explicitly set by user) or manually register RssShuffleHandle.

Just make sure we are on the same page. Only registering RssShuffleHandle to kryo serializer doesn't help resolve this issue. Each task will still have more than 670KB size in binary. for 10000 partitions job. And each task will have quite noticeable deserialization time to repeatedly deserialize partition -> shuffle server mappings as shown in https://docs.google.com/document/d/1TZ-3Mgwj9j7n1mMyCrS3sskFv_uOtUtXl1oF9Y_oMOw/edit?usp=sharing.
RssShuffleHandle is a field of ShuffleDependency which is serialized to task binary in below code in DAGScheduler. Without broadcast of ShuffleHandleInfo, it's hard to pull it out and avoid repeat of ShuffleHandleInfo.
` RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }

    partitions = stage.rdd.partitions
  }`

I think broadcast itself is quite simple and efficient. I don't see other better alternatives.

Sorry I wasn't clear enough. I agree broadcast is the best way to go. I'm just not sure that we should broadcast binary rather than just broadcast the shuffle handle info simply. The only problem that we don't broadcast shuffle handle info directly is that once spark.kryo.registerRequired(which is false by default) is true, the job will fail. To avoid this problem, we introduce quite some complex logic to deserialize and cache and weak ref etc. I'm wondering if that's worth it.

no worries. You guys can decide which option is better. I can revert back if you like.

Thanks for your understanding. Let's take a quick vote here, cc @zuston @jerqi @xianjingfeng @kaijchen

  1. if you are going to vote for current impl, please react with 🚀
  2. if you are going to vote on a simpler impl, a.k.a broadcast shuffle handle info directly, please react with 🎉 .

// shuffle ID to ShuffleIdRef
// ShuffleIdRef acts as strong reference to prevent cached ShuffleHandleInfo being GCed during shuffle
// ShuffleIdRef will be removed when unregisterShuffle()
private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the underline does not conform to the name definition specification of java. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it's just a convention to denote special variable. If it's violate uniffle style, I can change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// ShuffleIdRef will be removed when unregisterShuffle()
private static Map<Integer, ShuffleIdRef> _globalShuffleIdRefMap = new ConcurrentHashMap<>();
// each shuffle has unique ID even for multiple concurrent running shuffles and jobs per application
private static ThreadLocal<HandleInfoLocalCache> _localHandleInfoCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

…m RssShuffleHandle

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
@advancedxy
Copy link
Contributor

image

Hi, @zjf2012 seems that the majority of us think it's better to simply broadcast RssShuffleHandle object here, requires that spark.kryo.registerRequired=false. Would you mind to change it back?

@jiafuzha
Copy link
Contributor Author

jiafuzha commented Mar 9, 2023

image

Hi, @zjf2012 seems that the majority of us think it's better to simply broadcast RssShuffleHandle object here, requires that spark.kryo.registerRequired=false. Would you mind to change it back?

sure, I'll change it back tomorrow or next week.

…m RssShuffleHandle

revert to ShuffleHandleInfo without ser/deser

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
Copy link
Contributor

@smallzhongfeng smallzhongfeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @zjf2012

Copy link
Contributor

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except one minor comment.

@advancedxy
Copy link
Contributor

thanks @zjf2012. I noticed there's no added UTs here, do you think it's possible to add new UTs or existing UTs are sufficient?

@jiafuzha
Copy link
Contributor Author

thanks @zjf2012. I noticed there's no added UTs here, do you think it's possible to add new UTs or existing UTs are sufficient?

You are welcome. I think current UT is good enough since my changes only wrap up some existing things. No complex logic involved. If I add some UT, it would be mockups for the broadcast, no much actual business logic.

Copy link
Member

@xianjingfeng xianjingfeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xianjingfeng xianjingfeng merged commit 0a2cec9 into apache:master Mar 14, 2023
@xianjingfeng
Copy link
Member

Thanks @zjf2012

advancedxy pushed a commit to advancedxy/incubator-uniffle that referenced this pull request Mar 21, 2023
…Servers' from RssShuffleHandle (apache#637)

### What changes were proposed in this pull request?
move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.

### Why are the changes needed?
to reduce task delay and task serialize/deserialize time by reduce task binary size

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic
xianjingfeng pushed a commit to xianjingfeng/incubator-uniffle that referenced this pull request Apr 5, 2023
…Servers' from RssShuffleHandle (apache#637)

### What changes were proposed in this pull request?
move partition -> shuffle servers mapping from direct field of RssShuffleHandle to a broadcast variable to reduce task binary size.

### Why are the changes needed?
to reduce task delay and task serialize/deserialize time by reduce task binary size

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
tested with 10000 partitions shuffle. Task binary size reduced from more than 670KB to less than 6KB.
tested with multiple shuffle stages in same job to verify ShuffleHandleInfo cache logic
@zuston
Copy link
Member

zuston commented Aug 15, 2023

Will the broadcast be cleaned when the shuffle is removed?

yes. it's registered for cleanup when sparkcontext creates a new broadcast variable. However, the cleaner may not clean it immediately just like below unregisterShuffle method. The cleaner gets chance to capture them (shuffle, broadcast...) only after GC happened. This behavior is good for this broadcast variable since it's small anyway. Will it cause problem in below code snippet? shuffle server may not get cleaned up quickly if there is no right GC in driver.

@Override public boolean unregisterShuffle(int shuffleId) { try { if (SparkEnv.get().executorId().equals("driver")) { shuffleWriteClient.unregisterShuffle(appId, shuffleId); } } catch (Exception e) { LOG.warn("Errors on unregister to remote shuffle-servers", e); } return true; }

I don't see the logic of destroy the broadcast var explicitly. Do we need to destroy it in the unregisterShuffle method?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Improvement] Reduce task binary by removing 'partitionToServers' from RssShuffleHandle
7 participants