diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java index 2426d82872..25d3c1fa14 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java @@ -19,6 +19,7 @@ import java.util.Set; +import org.apache.spark.internal.config.ConfigEntry; import scala.Tuple2; import scala.runtime.AbstractFunction1; @@ -31,262 +32,298 @@ import org.apache.uniffle.common.config.ConfigOption; import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; -import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.config.RssConf; public class RssSparkConfig { public static final ConfigOption RSS_CLIENT_SEND_SIZE_LIMITATION = - ConfigOptions.key("rss.client.send.size.limit") - .longType() - .defaultValue(1024 * 1024 * 16L) - .withDescription("The max data size sent to shuffle server"); + ConfigOptions.key("rss.client.send.size.limit") + .longType() + .defaultValue(1024 * 1024 * 16L) + .withDescription("The max data size sent to shuffle server"); public static final ConfigOption RSS_MEMORY_SPILL_TIMEOUT = - ConfigOptions.key("rss.client.memory.spill.timeout.sec") - .intType() - .defaultValue(1) - .withDescription( - "The timeout of spilling data to remote shuffle server, " - + "which will be triggered by Spark TaskMemoryManager. Unit is sec, default value is 1"); + ConfigOptions.key("rss.client.memory.spill.timeout.sec") + .intType() + .defaultValue(1) + .withDescription( + "The timeout of spilling data to remote shuffle server, " + + "which will be triggered by Spark TaskMemoryManager. Unit is sec, default value is 1"); public static final ConfigOption RSS_ROW_BASED = - ConfigOptions.key("rss.row.based") - .booleanType() - .defaultValue(true) - .withDescription("indicates row based shuffle, set false when use in columnar shuffle"); + ConfigOptions.key("rss.row.based") + .booleanType() + .defaultValue(true) + .withDescription("indicates row based shuffle, set false when use in columnar shuffle"); public static final ConfigOption RSS_MEMORY_SPILL_ENABLED = - ConfigOptions.key("rss.client.memory.spill.enabled") - .booleanType() - .defaultValue(false) - .withDescription( - "The memory spill switch triggered by Spark TaskMemoryManager, default value is false."); + ConfigOptions.key("rss.client.memory.spill.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "The memory spill switch triggered by Spark TaskMemoryManager, default value is false."); public static final String SPARK_RSS_CONFIG_PREFIX = "spark."; public static final ConfigOption RSS_PARTITION_NUM_PER_RANGE = - createIntegerBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE)) - .createWithDefault(RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.partitionNum.per.range") + .intType() + .defaultValue(1) + .withDescription("The partition number of one range."); public static final ConfigOption RSS_WRITER_BUFFER_SIZE = - createStringBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE) - .doc("Buffer size for single partition data")) - .createWithDefault("3m"); + ConfigOptions.key("spark.rss.writer.buffer.size") + .stringType() + .defaultValue("3m") + .withDescription("Buffer size for single partition data"); + public static final ConfigOption RSS_WRITER_SERIALIZER_BUFFER_SIZE = - createStringBuilder(new ConfigBuilder("spark.rss.writer.serializer.buffer.size")) - .createWithDefault("3k"); + ConfigOptions.key("spark.rss.writer.serializer.buffer.size") + .stringType() + .defaultValue("3k") + .withDescription(""); public static final ConfigOption RSS_WRITER_BUFFER_SEGMENT_SIZE = - createStringBuilder(new ConfigBuilder("spark.rss.writer.buffer.segment.size")) - .createWithDefault("3k"); + ConfigOptions.key("spark.rss.writer.buffer.segment.size") + .stringType() + .defaultValue("3k") + .withDescription(""); public static final ConfigOption RSS_WRITER_BUFFER_SPILL_SIZE = - createStringBuilder( - new ConfigBuilder("spark.rss.writer.buffer.spill.size") - .doc("Buffer size for total partition data")) - .createWithDefault("128m"); + ConfigOptions.key("spark.rss.writer.buffer.spill.size") + .stringType() + .defaultValue("128m") + .withDescription("Buffer size for total partition data"); + public static final ConfigOption RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE = - createStringBuilder(new ConfigBuilder("spark.rss.writer.pre.allocated.buffer.size")) - .createWithDefault("16m"); + ConfigOptions.key("spark.rss.writer.pre.allocated.buffer.size") + .stringType() + .defaultValue("16m") + .withDescription(""); - public static final ConfigOption RSS_WRITER_REQUIRE_MEMORY_RETRY_MAX = - createIntegerBuilder(new ConfigBuilder("spark.rss.writer.require.memory.retryMax")) - .createWithDefault(1200); + public static final ConfigEntry RSS_WRITER_REQUIRE_MEMORY_RETRY_MAX = + createIntegerBuilder(new ConfigBuilder("spark.rss.writer.require.memory.retryMax")) + .createWithDefault(1200); - public static final ConfigOption RSS_WRITER_REQUIRE_MEMORY_INTERVAL = - createLongBuilder(new ConfigBuilder("spark.rss.writer.require.memory.interval")) - .createWithDefault(1000L); + public static final ConfigEntry RSS_WRITER_REQUIRE_MEMORY_INTERVAL = + createLongBuilder(new ConfigBuilder("spark.rss.writer.require.memory.interval")) + .createWithDefault(1000L); + + public static final ConfigEntry RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = + createLongBuilder( + new ConfigBuilder( + SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS)) + .createWithDefault(RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE); - public static final ConfigOption RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = - createLongBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS)) - .createWithDefault(RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE); public static final ConfigOption RSS_CLIENT_SEND_CHECK_INTERVAL_MS = - createLongBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS)) - .createWithDefault(RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.send.check.interval.ms") + .longType() + .defaultValue(500L) + .withDescription(""); public static final ConfigOption RSS_TEST_FLAG = - createBooleanBuilder(new ConfigBuilder("spark.rss.test")).createWithDefault(false); + ConfigOptions.key("spark.rss.test") + .booleanType() + .defaultValue(false) + .withDescription(""); + + public static final ConfigEntry RSS_TEST_MODE_ENABLE = + createBooleanBuilder( + new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE) + .doc("Whether enable test mode for the Spark Client")) + .createWithDefault(false); - public static final ConfigOption RSS_TEST_MODE_ENABLE = - createBooleanBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE) - .doc("Whether enable test mode for the Spark Client")) - .createWithDefault(false); public static final ConfigOption RSS_REMOTE_STORAGE_PATH = - createStringBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH)) - .createWithDefault(""); + ConfigOptions.key("spark.rss.remote.storage.path") + .stringType() + .defaultValue("") + .withDescription(""); public static final ConfigOption RSS_INDEX_READ_LIMIT = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT)) - .createWithDefault(RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE); - - public static final ConfigOption RSS_CLIENT_TYPE = - createStringBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE)) - .createWithDefault(RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE); - - public static final ConfigOption RSS_STORAGE_TYPE = - createStringBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE) - .doc("Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS")) - .createWithDefault(""); - - public static final ConfigOption RSS_CLIENT_RETRY_MAX = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX)) - .createWithDefault(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); - - public static final ConfigOption RSS_CLIENT_RETRY_INTERVAL_MAX = - createLongBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)) - .createWithDefault(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.index.read.limit") + .intType() + .defaultValue(500) + .withDescription(""); + + public static final ConfigEntry RSS_CLIENT_TYPE = + createStringBuilder( + new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE)) + .createWithDefault(RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE); + + public static final ConfigEntry RSS_STORAGE_TYPE = + createStringBuilder( + new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE) + .doc("Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS")) + .createWithDefault(""); + + public static final ConfigEntry RSS_CLIENT_RETRY_MAX = + createIntegerBuilder( + new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX)) + .createWithDefault(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); + + public static final ConfigEntry RSS_CLIENT_RETRY_INTERVAL_MAX = + createLongBuilder( + new ConfigBuilder( + SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)) + .createWithDefault(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); public static final ConfigOption RSS_CLIENT_HEARTBEAT_THREAD_NUM = - createIntegerBuilder(new ConfigBuilder("spark.rss.client.heartBeat.threadNum")) - .createWithDefault(4); + ConfigOptions.key("spark.rss.client.heartBeat.threadNum") + .intType() + .defaultValue(4) + .withDescription(""); public static final ConfigOption RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE = - createIntegerBuilder(new ConfigBuilder("spark.rss.client.unregister.thread.pool.size")) - .createWithDefault(10); + ConfigOptions.key("spark.rss.client.unregister.thread.pool.size") + .intType() + .defaultValue(10) + .withDescription(""); public static final ConfigOption RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC = - createIntegerBuilder(new ConfigBuilder("spark.rss.client.unregister.request.timeout.sec")) - .createWithDefault(10); + ConfigOptions.key("spark.rss.client.unregister.request.timeout.sec") + .intType() + .defaultValue(10) + .withDescription(""); // When the size of read buffer reaches the half of JVM region (i.e., 32m), // it will incur humongous allocation, so we set it to 14m. public static final ConfigOption RSS_CLIENT_READ_BUFFER_SIZE = - createStringBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE) - .doc("The max data size read from storage")) - .createWithDefault(RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.read.buffer.size") + .stringType() + .defaultValue("14m") + .withDescription("The max data size read from storage"); + public static final ConfigOption RSS_HEARTBEAT_INTERVAL = - createLongBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL)) - .createWithDefault(RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.heartbeat.interval") + .longType() + .defaultValue(10 * 1000L) + .withDescription("The max data size read from storage"); public static final ConfigOption RSS_HEARTBEAT_TIMEOUT = - createLongBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT)) - .createWithDefault(5 * 1000L); + ConfigOptions.key("spark.rss.heartbeat.timeout") + .longType() + .defaultValue(5 * 1000L) + .withDescription(""); + public static final ConfigOption RSS_CLIENT_SEND_THREAD_POOL_SIZE = - createIntegerBuilder( - new ConfigBuilder("spark.rss.client.send.threadPool.size") - .doc("The thread size for send shuffle data to shuffle server")) - .createWithDefault(10); + ConfigOptions.key("spark.rss.client.send.threadPool.size") + .intType() + .defaultValue(10) + .withDescription("The thread size for send shuffle data to shuffle server"); public static final ConfigOption RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE = - createIntegerBuilder(new ConfigBuilder("spark.rss.client.send.threadPool.keepalive")) - .createWithDefault(60); + ConfigOptions.key("spark.rss.client.send.threadPool.keepalive") + .intType() + .defaultValue(60) + .withDescription(""); public static final ConfigOption RSS_DATA_REPLICA = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA) - .doc( - "The max server number that each block can be send by client in quorum protocol")) - .createWithDefault(RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.rss.data.replica") + .intType() + .defaultValue(1) + .withDescription("The max server number that each block can be send by client in quorum protocol"); public static final ConfigOption RSS_DATA_REPLICA_WRITE = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE) - .doc( - "The min server number that each block should be send by client successfully")) - .createWithDefault(RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.data.replica.write") + .intType() + .defaultValue(1) + .withDescription("The min server number that each block should be send by client successfully"); + + public static final ConfigOption RSS_DATA_REPLICA_READ = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ) - .doc( - "The min server number that metadata should be fetched by client successfully")) - .createWithDefault(RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.data.replica.read") + .intType() + .defaultValue(1) + .withDescription("The min server number that metadata should be fetched by client successfully"); + public static final ConfigOption RSS_DATA_REPLICA_SKIP_ENABLED = - createBooleanBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED)) - .createWithDefault(RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.data.replica.skip.enabled") + .booleanType() + .defaultValue(true) + .withDescription(""); + public static final ConfigOption RSS_DATA_TRANSFER_POOL_SIZE = - createIntegerBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE)) - .createWithDefault(RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.data.transfer.pool.size") + .intType() + .defaultValue(RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE) + .withDescription(""); public static final ConfigOption RSS_DATA_COMMIT_POOL_SIZE = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE) - .doc("The thread size for sending commit to shuffle servers")) - .createWithDefault(RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.data.commit.pool.size") + .intType() + .defaultValue(-1) + .withDescription("The thread size for sending commit to shuffle servers"); + + + public static final ConfigEntry RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE = + createBooleanBuilder(new ConfigBuilder("spark.rss.ozone.dfs.namenode.odfs.enable")) + .createWithDefault(false); - public static final ConfigOption RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE = - createBooleanBuilder(new ConfigBuilder("spark.rss.ozone.dfs.namenode.odfs.enable")) - .createWithDefault(false); - public static final ConfigOption RSS_OZONE_FS_HDFS_IMPL = - createStringBuilder(new ConfigBuilder("spark.rss.ozone.fs.hdfs.impl")) - .createWithDefault("org.apache.hadoop.odfs.HdfsOdfsFilesystem"); + public static final ConfigEntry RSS_OZONE_FS_HDFS_IMPL = + createStringBuilder(new ConfigBuilder("spark.rss.ozone.fs.hdfs.impl")) + .createWithDefault("org.apache.hadoop.odfs.HdfsOdfsFilesystem"); - public static final ConfigOption RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL = - createStringBuilder(new ConfigBuilder("spark.rss.ozone.fs.AbstractFileSystem.hdfs.impl")) - .createWithDefault("org.apache.hadoop.odfs.HdfsOdfs"); + public static final ConfigEntry RSS_OZONE_FS_ABSTRACT_FILE_SYSTEM_HDFS_IMPL = + createStringBuilder(new ConfigBuilder("spark.rss.ozone.fs.AbstractFileSystem.hdfs.impl")) + .createWithDefault("org.apache.hadoop.odfs.HdfsOdfs"); public static final ConfigOption RSS_CLIENT_BITMAP_SPLIT_NUM = - createIntegerBuilder(new ConfigBuilder("spark.rss.client.bitmap.splitNum")) - .createWithDefault(1); + ConfigOptions.key("spark.rss.client.bitmap.splitNum") + .intType() + .defaultValue(1) + .withDescription(""); public static final ConfigOption RSS_ACCESS_ID = - createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault(""); + ConfigOptions.key("spark.rss.access.id") + .stringType() + .defaultValue("") + .withDescription(""); + public static final ConfigOption RSS_ACCESS_TIMEOUT_MS = - createIntegerBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS)) - .createWithDefault(RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.access.timeout.ms") + .intType() + .defaultValue(10000) + .withDescription(""); public static final ConfigOption RSS_ENABLED = - createBooleanBuilder(new ConfigBuilder("spark.rss.enabled")).createWithDefault(false); + ConfigOptions.key("spark.rss.enabled") + .booleanType() + .defaultValue(false) + .withDescription(""); public static final ConfigOption RSS_DYNAMIC_CLIENT_CONF_ENABLED = - createBooleanBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED)) - .createWithDefault(RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.dynamicClientConf.enabled") + .booleanType() + .defaultValue(true) + .withDescription(""); public static final ConfigOption RSS_CLIENT_ASSIGNMENT_TAGS = - createStringBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS) - .doc( - "The comma-separated list of tags for deciding assignment shuffle servers. " + ConfigOptions.key("spark.rss.client.assignment.tags") + .stringType() + .defaultValue("") + .withDescription("The comma-separated list of tags for deciding assignment shuffle servers. " + "Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag " - + "whether this conf is set or not")) - .createWithDefault(""); + + "whether this conf is set or not"); + public static final ConfigOption RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE = - createBooleanBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConf.OFF_HEAP_MEMORY_ENABLE.key()) - .doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description())) - .createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue()); + ConfigOptions.key("spark.rss.client.off.heap.memory.enable") + .booleanType() + .defaultValue(false) + .withDescription("Client can use off heap memory"); - public static final ConfigOption RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = + public static final ConfigEntry RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = createIntegerBuilder( new ConfigBuilder( SPARK_RSS_CONFIG_PREFIX @@ -294,37 +331,39 @@ public class RssSparkConfig { .createWithDefault( RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE); + public static final ConfigOption RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = - createLongBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL)) - .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.assignment.retry.interval") + .longType() + .defaultValue(65000l) + .withDescription(""); + public static final ConfigOption RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = - createIntegerBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES)) - .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE); + ConfigOptions.key("spark.rss.client.assignment.retry.times") + .intType() + .defaultValue(3) + .withDescription(""); public static final ConfigOption RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS = - createLongBuilder( - new ConfigBuilder("spark.rss.client.access.retry.interval.ms") - .doc("Interval between retries fallback to SortShuffleManager")) - .createWithDefault(20000L); + ConfigOptions.key("spark.rss.client.access.retry.interval.ms") + .longType() + .defaultValue(20000L) + .withDescription("Interval between retries fallback to SortShuffleManager"); public static final ConfigOption RSS_CLIENT_ACCESS_RETRY_TIMES = - createIntegerBuilder( - new ConfigBuilder("spark.rss.client.access.retry.times") - .doc("Number of retries fallback to SortShuffleManager")) - .createWithDefault(0); + ConfigOptions.key("spark.rss.client.access.retry.times") + .intType() + .defaultValue(0) + .withDescription("Number of retries fallback to SortShuffleManager"); - public static final ConfigOption RSS_COORDINATOR_QUORUM = + public static final ConfigEntry RSS_COORDINATOR_QUORUM = createStringBuilder( new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM) .doc("Coordinator quorum")) .createWithDefault(""); - public static final ConfigOption RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR = + public static final ConfigEntry RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR = createDoubleBuilder( new ConfigBuilder( SPARK_RSS_CONFIG_PREFIX @@ -336,7 +375,7 @@ public class RssSparkConfig { .createWithDefault( RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE); - public static final ConfigOption RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = + public static final ConfigEntry RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = createBooleanBuilder( new ConfigBuilder( SPARK_RSS_CONFIG_PREFIX @@ -346,7 +385,7 @@ public class RssSparkConfig { + " of concurrent tasks.")) .createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE); - public static final ConfigOption RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = + public static final ConfigEntry RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = createIntegerBuilder( new ConfigBuilder( SPARK_RSS_CONFIG_PREFIX @@ -358,19 +397,16 @@ public class RssSparkConfig { RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE); public static final ConfigOption RSS_SHUFFLE_MANAGER_GRPC_PORT = - createIntegerBuilder( - new ConfigBuilder( - SPARK_RSS_CONFIG_PREFIX + RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.key()) - .internal() - .doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description())) - .createWithDefault(-1); + ConfigOptions.key("spark.rss.shuffle.manager.grpc.port") + .intType() + .noDefaultValue() + .withDescription("internal configuration to indicate which port is actually bind for shuffle manager service."); public static final ConfigOption RSS_RESUBMIT_STAGE = - createBooleanBuilder( - new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE) - .internal() - .doc("Whether to enable the resubmit stage.")) - .createWithDefault(false); + ConfigOptions.key("spark.rss.resubmit.stage") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable the resubmit stage."); // spark2 doesn't have this key defined public static final String SPARK_SHUFFLE_COMPRESS_KEY = "spark.shuffle.compress"; @@ -442,4 +478,4 @@ public static RssConf toRssConf(SparkConf sparkConf) { } return rssConf; } -} +} \ No newline at end of file diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java index f09d55214c..b0b0d1a81d 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/BufferManagerOptions.java @@ -40,23 +40,23 @@ public BufferManagerOptions(SparkConf sparkConf) { bufferSize = sparkConf.getSizeAsBytes( RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), - RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get()); + RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue()); serializerBufferSize = sparkConf.getSizeAsBytes( RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), - RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.defaultValue().get()); + RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.defaultValue()); bufferSegmentSize = sparkConf.getSizeAsBytes( RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), - RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.defaultValue().get()); + RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.defaultValue()); bufferSpillThreshold = sparkConf.getSizeAsBytes( RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), - RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.defaultValue().get()); + RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.defaultValue()); preAllocatedBufferSize = sparkConf.getSizeAsBytes( RssSparkConfig.RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE.key(), - RssSparkConfig.RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE.defaultValue().get()); + RssSparkConfig.RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE.defaultValue()); requireMemoryInterval = sparkConf.get(RssSparkConfig.RSS_WRITER_REQUIRE_MEMORY_INTERVAL); requireMemoryRetryMax = sparkConf.get(RssSparkConfig.RSS_WRITER_REQUIRE_MEMORY_RETRY_MAX); LOG.info(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key() + "=" + bufferSize);