Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ public class RssClientConfig {
public static String RSS_OZONE_FS_HDFS_IMPL_DEFAULT_VALUE = "org.apache.hadoop.odfs.HdfsOdfsFilesystem";
public static String RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL = "spark.rss.ozone.fs.AbstractFileSystem.hdfs.impl";
public static String RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL_DEFAULT_VALUE = "org.apache.hadoop.odfs.HdfsOdfs";
public static String RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION = "spark.rss.block.per.task.partition";
public static int RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION_DEFAULT_VALUE = 20;
public static String RSS_CLIENT_BLOCK_NUM_PER_BITMAP = "spark.rss.block.per.bitmap";
public static long RSS_CLIENT_BLOCK_NUM_PER_BITMAP_DEFAULT_VALUE = 100000000;
// it is used in shuffle server to decide how many bitmap should be used to store blockId
// the target for this is to improve shuffle server's performance of report/get shuffle result
// currently, all blockIds are stored in multiple shuffle servers, update this config if there
// has huge amount blockIds, eg, 10B.
public static String RSS_CLIENT_BITMAP_SPLIT_NUM = "spark.rss.client.bitmap.splitNum";
public static int RSS_CLIENT_BITMAP_SPLIT_NUM_DEFAULT_VALUE = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import scala.collection.Iterator;

import com.tencent.rss.client.api.ShuffleWriteClient;
import com.tencent.rss.client.util.ClientUtils;
import com.tencent.rss.common.ShuffleBlockInfo;
import com.tencent.rss.common.ShuffleServerInfo;
import com.tencent.rss.common.exception.RssException;
Expand All @@ -70,10 +69,8 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
private String appId;
private int numMaps;
private int numPartitions;
private int shuffleId;
private int blockNumPerTaskPartition;
private long blockNumPerBitmap;
private int bitmapSplitNum;
private String taskId;
private long taskAttemptId;
private ShuffleDependency<K, V, C> shuffleDependency;
Expand Down Expand Up @@ -107,7 +104,6 @@ public RssShuffleWriter(
this.shuffleDependency = rssHandle.getDependency();
this.shuffleWriteMetrics = shuffleWriteMetrics;
this.partitioner = shuffleDependency.partitioner();
this.numPartitions = partitioner.numPartitions();
this.shuffleManager = shuffleManager;
this.shouldPartition = partitioner.numPartitions() > 1;
this.sendCheckTimeout = sparkConf.getLong(RssClientConfig.RSS_WRITER_SEND_CHECK_TIMEOUT,
Expand All @@ -116,10 +112,8 @@ public RssShuffleWriter(
RssClientConfig.RSS_WRITER_SEND_CHECK_INTERVAL_DEFAULT_VALUE);
this.sendSizeLimit = sparkConf.getSizeAsBytes(RssClientConfig.RSS_CLIENT_SEND_SIZE_LIMIT,
RssClientConfig.RSS_CLIENT_SEND_SIZE_LIMIT_DEFAULT_VALUE);
this.blockNumPerTaskPartition = sparkConf.getInt(RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION,
RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION_DEFAULT_VALUE);
this.blockNumPerBitmap = sparkConf.getLong(RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_BITMAP,
RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_BITMAP_DEFAULT_VALUE);
this.bitmapSplitNum = sparkConf.getInt(RssClientConfig.RSS_CLIENT_BITMAP_SPLIT_NUM,
RssClientConfig.RSS_CLIENT_BITMAP_SPLIT_NUM_DEFAULT_VALUE);
this.partitionToBlockIds = Maps.newConcurrentMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
Expand Down Expand Up @@ -305,11 +299,10 @@ public Option<MapStatus> stop(boolean success) {
ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
}
long start = System.currentTimeMillis();
int bitmapNum = ClientUtils.getBitmapNum(numMaps, numPartitions, blockNumPerTaskPartition, blockNumPerBitmap);
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId, taskAttemptId, ptb,
bitmapNum);
LOG.info("Report shuffle result for task[" + taskAttemptId + "] with bitmapNum[" + bitmapNum + "] cost "
+ (System.currentTimeMillis() - start) + " ms");
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId,
taskAttemptId, ptb, bitmapSplitNum);
LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start));
MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId, partitionLengths);
return Option.apply(mapStatus);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import scala.collection.Iterator;

import com.tencent.rss.client.api.ShuffleWriteClient;
import com.tencent.rss.client.util.ClientUtils;
import com.tencent.rss.common.ShuffleBlockInfo;
import com.tencent.rss.common.ShuffleServerInfo;
import com.tencent.rss.common.exception.RssException;
Expand All @@ -72,14 +71,12 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final ShuffleDependency<K, V, C> shuffleDependency;
private final ShuffleWriteMetrics shuffleWriteMetrics;
private final Partitioner partitioner;
private final int numPartitions;
private final RssShuffleManager shuffleManager;
private final boolean shouldPartition;
private final long sendCheckTimeout;
private final long sendCheckInterval;
private final long sendSizeLimit;
private final int blockNumPerTaskPartition;
private final long blockNumPerBitmap;
private final int bitmapSplitNum;
private final Map<Integer, Set<Long>> partitionToBlockIds;
private final ShuffleWriteClient shuffleWriteClient;
private final Map<Integer, List<ShuffleServerInfo>> partitionToServers;
Expand Down Expand Up @@ -109,18 +106,15 @@ public RssShuffleWriter(
this.shuffleWriteMetrics = shuffleWriteMetrics;
this.shuffleDependency = rssHandle.getDependency();
this.partitioner = shuffleDependency.partitioner();
this.numPartitions = partitioner.numPartitions();
this.shouldPartition = partitioner.numPartitions() > 1;
this.sendCheckInterval = sparkConf.getLong(RssClientConfig.RSS_WRITER_SEND_CHECK_INTERVAL,
RssClientConfig.RSS_WRITER_SEND_CHECK_INTERVAL_DEFAULT_VALUE);
this.sendCheckTimeout = sparkConf.getLong(RssClientConfig.RSS_WRITER_SEND_CHECK_TIMEOUT,
RssClientConfig.RSS_WRITER_SEND_CHECK_TIMEOUT_DEFAULT_VALUE);
this.blockNumPerTaskPartition = sparkConf.getInt(RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION,
RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_TASK_PARTITION_DEFAULT_VALUE);
this.sendSizeLimit = sparkConf.getSizeAsBytes(RssClientConfig.RSS_CLIENT_SEND_SIZE_LIMIT,
RssClientConfig.RSS_CLIENT_SEND_SIZE_LIMIT_DEFAULT_VALUE);
this.blockNumPerBitmap = sparkConf.getLong(RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_BITMAP,
RssClientConfig.RSS_CLIENT_BLOCK_NUM_PER_BITMAP_DEFAULT_VALUE);
this.bitmapSplitNum = sparkConf.getInt(RssClientConfig.RSS_CLIENT_BITMAP_SPLIT_NUM,
RssClientConfig.RSS_CLIENT_BITMAP_SPLIT_NUM_DEFAULT_VALUE);
this.partitionToBlockIds = Maps.newConcurrentMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
Expand Down Expand Up @@ -288,16 +282,14 @@ public Option<MapStatus> stop(boolean success) {
if (success) {
try {
Map<Integer, List<Long>> ptb = Maps.newHashMap();
int bitmapNum = ClientUtils.getBitmapNum(numMaps, numPartitions, blockNumPerTaskPartition, blockNumPerBitmap);
for (Map.Entry<Integer, Set<Long>> entry : partitionToBlockIds.entrySet()) {
ptb.put(entry.getKey(), Lists.newArrayList(entry.getValue()));
}
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId, taskAttemptId, ptb,
bitmapNum);
LOG.info("Report application " + appId + "shuffle result for task[" + taskAttemptId
+ "] with bitmapNum[" + bitmapNum + "] with partitionToBlocksSize[" + ptb.size() + "] cost "
+ (System.currentTimeMillis() - start) + " ms");
shuffleWriteClient.reportShuffleResult(partitionToServers, appId, shuffleId,
taskAttemptId, ptb, bitmapSplitNum);
LOG.info("Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
taskAttemptId, bitmapSplitNum, (System.currentTimeMillis() - start));
// todo: we can replace the dummy host and port with the real shuffle server which we prefer to read
final BlockManagerId blockManagerId = BlockManagerId.apply(appId + "_" + taskId,
DUMMY_HOST,
Expand Down
37 changes: 0 additions & 37 deletions client/src/main/java/com/tencent/rss/client/util/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,4 @@ public static Long getBlockId(long partitionId, long taskAttemptId, long atomicI
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) + taskAttemptId;
}

// blockId will be stored in bitmap in shuffle server, more bitmap will cost more memory
// to reduce memory cost, merge blockId of different partition in one bitmap
public static int getBitmapNum(
int taskNum,
int partitionNum,
int blockNumPerTaskPerPartition,
long blockNumPerBitmap) {
// depend on performance test, spark.rss.block.per.bitmap should be great than 20000000
if (blockNumPerBitmap < 20000000) {
throw new IllegalArgumentException("blockNumPerBitmap should be greater than 20000000");
}
// depend on actual job, spark.rss.block.per.task.partition should be less than 1000000
// which maybe generate about 1T shuffle data/per task per partition
if (blockNumPerTaskPerPartition < 0 || blockNumPerTaskPerPartition > 1000000) {
throw new IllegalArgumentException("blockNumPerTaskPerPartition should be less than 1000000");
}
// to avoid overflow when do the calculation, reduce the data if possible
// it's ok the result is not accuracy
int processedTaskNum = taskNum;
int processedPartitionNum = partitionNum;
long processedBlockNumPerBitmap = blockNumPerBitmap;
if (taskNum > 1000) {
processedTaskNum = taskNum / 1000;
processedBlockNumPerBitmap = processedBlockNumPerBitmap / 1000;
}
if (partitionNum > 1000) {
processedPartitionNum = partitionNum / 1000;
processedBlockNumPerBitmap = processedBlockNumPerBitmap / 1000;
}
long bitmapNum = 1L * blockNumPerTaskPerPartition * processedTaskNum
* processedPartitionNum / processedBlockNumPerBitmap + 1;
if (bitmapNum > partitionNum || bitmapNum < 0) {
bitmapNum = partitionNum;
}
return (int) bitmapNum;
}
}
32 changes: 4 additions & 28 deletions client/src/test/java/com/tencent/rss/client/ClientUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package com.tencent.rss.client;

import org.junit.Test;

import com.tencent.rss.client.util.ClientUtils;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.tencent.rss.client.util.ClientUtils;
import org.junit.Test;

public class ClientUtilsTest {

private static String EXCEPTION_EXPECTED = "Exception excepted";
Expand Down Expand Up @@ -59,29 +60,4 @@ public void getBlockIdTest() {
assertTrue(e.getMessage().contains("Can't support sequence[524288], the max value should be 524287"));
}
}

@Test
public void getBitmapNumTest() {
// max value of taskNum, partitionNum, blockNumPerTaskPerPartition, it is unexpected in real job
assertEquals(
2147483647, ClientUtils.getBitmapNum(Integer.MAX_VALUE, Integer.MAX_VALUE, 1000000, 100000000L));
// taskNum * partitionNum * blockNumPerTaskPerPartition / blockNumPerBitmap > 0
assertEquals(
5001, ClientUtils.getBitmapNum(100000, 100000, 50, 100000000L));
// taskNum * partitionNum * blockNumPerTaskPerPartition / blockNumPerBitmap = 0
assertEquals(
1, ClientUtils.getBitmapNum(1999, 1999, 50, 100000000L));
try {
ClientUtils.getBitmapNum(1, 1, 1, 19999999L);
fail(EXCEPTION_EXPECTED);
} catch (Exception e) {
assertTrue(e.getMessage().contains("blockNumPerBitmap should be greater than"));
}
try {
ClientUtils.getBitmapNum(1, 1, 1000001, 20000000L);
fail(EXCEPTION_EXPECTED);
} catch (Exception e) {
assertTrue(e.getMessage().contains("blockNumPerTaskPerPartition should be less than"));
}
}
}