Skip to content

Commit

Permalink
[#1064] improvement(tez): Make shuffle data send thread pool configur…
Browse files Browse the repository at this point in the history
…able in WriteBufferManager. (#1065)

### What changes were proposed in this pull request?
Currently the number of threads in the send data thread pool in WriteBufferManager is fixed at 1, making it configurable.

### Why are the changes needed?
Fix: #[1064](#1064)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests can cover it.

Co-authored-by: jay.zhu <jay.zhu@huolala.cn>
  • Loading branch information
zhuyaogai and jay.zhu committed Aug 2, 2023
1 parent 749f270 commit d149d57
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class RssTezConfig {
public static final String RSS_CLIENT_MEMORY_THRESHOLD =
TEZ_RSS_CONFIG_PREFIX + "rss.client.memory.threshold";
public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f;
public static final String RSS_CLIENT_SEND_THREAD_NUM =
TEZ_RSS_CONFIG_PREFIX + "rss.client.send.thread.num";
public static final int RSS_CLIENT_DEFAULT_THREAD_NUM =
RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM;
public static final String RSS_CLIENT_SEND_THRESHOLD =
TEZ_RSS_CONFIG_PREFIX + "rss.client.send.threshold";
public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public WriteBufferManager(
Serializer<V> valSerializer,
long maxBufferSize,
double memoryThreshold,
int sendThreadNum,
double sendThreshold,
int batch,
RssConf rssConf,
Expand Down Expand Up @@ -152,7 +153,7 @@ public WriteBufferManager(
this.isNeedSorted = isNeedSorted;
this.mapOutputByteCounter = mapOutputByteCounter;
this.sendExecutorService =
Executors.newFixedThreadPool(1, ThreadUtils.getThreadFactory("send-thread"));
Executors.newFixedThreadPool(sendThreadNum, ThreadUtils.getThreadFactory("send-thread"));
}

/** add record */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public RssSorter(
conf.getDouble(
RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
int sendThreadNum =
conf.getInt(
RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
double sendThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
Expand Down Expand Up @@ -125,6 +128,7 @@ public RssSorter(
LOG.info("maxSegmentSize is {}", maxSegmentSize);
LOG.info("maxBufferSize is {}", maxBufferSize);
LOG.info("memoryThreshold is {}", memoryThreshold);
LOG.info("sendThreadNum is {}", sendThreadNum);
LOG.info("sendThreshold is {}", sendThreshold);
LOG.info("batch is {}", batch);
LOG.info("storageType is {}", storageType);
Expand All @@ -150,6 +154,7 @@ public RssSorter(
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
new RssConf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public RssUnSorter(
conf.getDouble(
RssTezConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
int sendThreadNum =
conf.getInt(
RssTezConfig.RSS_CLIENT_SEND_THREAD_NUM, RssTezConfig.RSS_CLIENT_DEFAULT_THREAD_NUM);
double sendThreshold =
conf.getDouble(
RssTezConfig.RSS_CLIENT_SEND_THRESHOLD, RssTezConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
Expand Down Expand Up @@ -123,6 +126,7 @@ public RssUnSorter(
LOG.info("maxSegmentSize is {}", maxSegmentSize);
LOG.info("maxBufferSize is {}", maxBufferSize);
LOG.info("memoryThreshold is {}", memoryThreshold);
LOG.info("sendThreadNum is {}", sendThreadNum);
LOG.info("sendThreshold is {}", sendThreshold);
LOG.info("batch is {}", batch);
LOG.info("storageType is {}", storageType);
Expand All @@ -148,6 +152,7 @@ public RssUnSorter(
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
new RssConf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testWriteException(@TempDir File tmpDir) throws IOException, Interru
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void testWriteException(@TempDir File tmpDir) throws IOException, Interru
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
rssConf,
Expand Down Expand Up @@ -185,6 +187,7 @@ public void testWriteNormal(@TempDir File tmpDir) throws IOException, Interrupte
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
Expand Down Expand Up @@ -228,6 +231,7 @@ public void testWriteNormal(@TempDir File tmpDir) throws IOException, Interrupte
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
rssConf,
Expand Down Expand Up @@ -292,6 +296,7 @@ public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir)
serializationFactory.getSerializer(BytesWritable.class);
long maxBufferSize = 14 * 1024 * 1024;
double memoryThreshold = 0.8f;
int sendThreadNum = 1;
double sendThreshold = 0.2f;
int batch = 50;
int numMaps = 1;
Expand Down Expand Up @@ -334,6 +339,7 @@ public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir)
valSerializer,
maxBufferSize,
memoryThreshold,
sendThreadNum,
sendThreshold,
batch,
rssConf,
Expand Down

0 comments on commit d149d57

Please sign in to comment.