Skip to content

Commit

Permalink
[Improvement] Replace ConfigEntry with ConfigOption in RssSparkConfig a…
Browse files Browse the repository at this point in the history
  • Loading branch information
guixiaowen committed Nov 26, 2023
1 parent d5de299 commit c2df19e
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;

import org.apache.uniffle.common.config.RssClientConf;

public class RssSparkConfig {

public static final ConfigOption<Long> RSS_CLIENT_SEND_SIZE_LIMITATION =
Expand Down Expand Up @@ -67,34 +68,42 @@ public class RssSparkConfig {
public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";

public static final ConfigEntry<Integer> RSS_PARTITION_NUM_PER_RANGE =
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE))
.createWithDefault(RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);

public static final ConfigEntry<String> RSS_WRITER_BUFFER_SIZE =
createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE)
.doc("Buffer size for single partition data"))
.createWithDefault("3m");

public static final ConfigEntry<String> RSS_WRITER_SERIALIZER_BUFFER_SIZE =
createStringBuilder(new ConfigBuilder("spark.rss.writer.serializer.buffer.size"))
.createWithDefault("3k");

public static final ConfigEntry<String> RSS_WRITER_BUFFER_SEGMENT_SIZE =
createStringBuilder(new ConfigBuilder("spark.rss.writer.buffer.segment.size"))
.createWithDefault("3k");

public static final ConfigEntry<String> RSS_WRITER_BUFFER_SPILL_SIZE =
createStringBuilder(
new ConfigBuilder("spark.rss.writer.buffer.spill.size")
.doc("Buffer size for total partition data"))
.createWithDefault("128m");

public static final ConfigEntry<String> RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE =
createStringBuilder(new ConfigBuilder("spark.rss.writer.pre.allocated.buffer.size"))
.createWithDefault("16m");
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE))
.createWithDefault(RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE);

public static final ConfigOption<String> RSS_WRITER_BUFFER_SIZE =
ConfigOptions.key("spark.rss.writer.buffer.size")
.stringType()
.defaultValue("3m")
.withDescription("Buffer size for single partition data");


public static final ConfigOption<String> RSS_WRITER_SERIALIZER_BUFFER_SIZE =
ConfigOptions.key("spark.rss.writer.serializer.buffer.size")
.stringType()
.defaultValue("3k")
.withDescription("");

public static final ConfigOption<String> RSS_WRITER_BUFFER_SEGMENT_SIZE =
ConfigOptions.key("spark.rss.writer.buffer.segment.size")
.stringType()
.defaultValue("3k")
.withDescription("");

public static final ConfigOption<String> RSS_WRITER_BUFFER_SPILL_SIZE =
ConfigOptions.key("spark.rss.writer.buffer.spill.size")
.stringType()
.defaultValue("128m")
.withDescription("Buffer size for total partition data");


public static final ConfigOption<String> RSS_WRITER_PRE_ALLOCATED_BUFFER_SIZE =
ConfigOptions.key("spark.rss.writer.pre.allocated.buffer.size")
.stringType()
.defaultValue("16m")
.withDescription("");

public static final ConfigEntry<Integer> RSS_WRITER_REQUIRE_MEMORY_RETRY_MAX =
createIntegerBuilder(new ConfigBuilder("spark.rss.writer.require.memory.retryMax"))
Expand All @@ -116,24 +125,30 @@ public class RssSparkConfig {
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS))
.createWithDefault(RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);

public static final ConfigEntry<Boolean> RSS_TEST_FLAG =
createBooleanBuilder(new ConfigBuilder("spark.rss.test")).createWithDefault(false);
public static final ConfigOption<Boolean> RSS_TEST_FLAG =
ConfigOptions.key("spark.rss.test")
.booleanType()
.defaultValue(false)
.withDescription("");

public static final ConfigEntry<Boolean> RSS_TEST_MODE_ENABLE =
createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_TEST_MODE_ENABLE)
.doc("Whether enable test mode for the Spark Client"))
.createWithDefault(false);
.doc("Whether enable test mode for the Spark Client"))
.createWithDefault(false);

public static final ConfigEntry<String> RSS_REMOTE_STORAGE_PATH =
createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH))
.createWithDefault("");

public static final ConfigEntry<Integer> RSS_INDEX_READ_LIMIT =
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT))
.createWithDefault(RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE);
public static final ConfigOption<String> RSS_REMOTE_STORAGE_PATH =
ConfigOptions.key("spark.rss.remote.storage.path")
.stringType()
.defaultValue("")
.withDescription("");

public static final ConfigOption<Integer> RSS_INDEX_READ_LIMIT =
ConfigOptions.key("spark.rss.index.read.limit")
.intType()
.defaultValue(500)
.withDescription("");

public static final ConfigEntry<String> RSS_CLIENT_TYPE =
createStringBuilder(
Expand All @@ -159,44 +174,46 @@ public class RssSparkConfig {

public static final ConfigEntry<Integer> RSS_CLIENT_HEARTBEAT_THREAD_NUM =
createIntegerBuilder(new ConfigBuilder("spark.rss.client.heartBeat.threadNum"))
.createWithDefault(4);
.createWithDefault(4);

public static final ConfigEntry<Integer> RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE =
createIntegerBuilder(new ConfigBuilder("spark.rss.client.unregister.thread.pool.size"))
.createWithDefault(10);
.createWithDefault(10);

public static final ConfigEntry<Integer> RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC =
createIntegerBuilder(new ConfigBuilder("spark.rss.client.unregister.request.timeout.sec"))
.createWithDefault(10);
.createWithDefault(10);

// When the size of read buffer reaches the half of JVM region (i.e., 32m),
// it will incur humongous allocation, so we set it to 14m.
public static final ConfigEntry<String> RSS_CLIENT_READ_BUFFER_SIZE =
createStringBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE)
.doc("The max data size read from storage"))
.createWithDefault(RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE);
public static final ConfigOption<String> RSS_CLIENT_READ_BUFFER_SIZE =
ConfigOptions.key("spark.rss.client.read.buffer.size")
.stringType()
.defaultValue("14m")
.withDescription("The max data size read from storage");


public static final ConfigEntry<Long> RSS_HEARTBEAT_INTERVAL =
createLongBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL))
.createWithDefault(RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);
createLongBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL))
.createWithDefault(RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE);

public static final ConfigOption<Long> RSS_HEARTBEAT_TIMEOUT =
ConfigOptions.key("spark.rss.heartbeat.timeout")
.longType()
.defaultValue(5 * 1000L)
.withDescription("");

public static final ConfigEntry<Long> RSS_HEARTBEAT_TIMEOUT =
createLongBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT))
.createWithDefault(5 * 1000L);

public static final ConfigEntry<Integer> RSS_CLIENT_SEND_THREAD_POOL_SIZE =
createIntegerBuilder(
new ConfigBuilder("spark.rss.client.send.threadPool.size")
.doc("The thread size for send shuffle data to shuffle server"))
.createWithDefault(10);
.doc("The thread size for send shuffle data to shuffle server"))
.createWithDefault(10);

public static final ConfigEntry<Integer> RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE =
createIntegerBuilder(new ConfigBuilder("spark.rss.client.send.threadPool.keepalive"))
.createWithDefault(60);
.createWithDefault(60);

public static final ConfigEntry<Integer> RSS_DATA_REPLICA =
createIntegerBuilder(
Expand All @@ -206,36 +223,38 @@ public class RssSparkConfig {
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_DATA_REPLICA_WRITE =
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE)
.doc(
"The min server number that each block should be send by client successfully"))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE)
.doc(
"The min server number that each block should be send by client successfully"))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE);


public static final ConfigEntry<Integer> RSS_DATA_REPLICA_READ =
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ)
.doc(
"The min server number that metadata should be fetched by client successfully"))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);
.doc(
"The min server number that metadata should be fetched by client successfully"))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE);

public static final ConfigEntry<Boolean> RSS_DATA_REPLICA_SKIP_ENABLED =
createBooleanBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);
createBooleanBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED))
.createWithDefault(RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_DATA_TRANSFER_POOL_SIZE =
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE))
.createWithDefault(RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE);
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE))
.createWithDefault(RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_DATA_COMMIT_POOL_SIZE =
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE)
.doc("The thread size for sending commit to shuffle servers"))
.createWithDefault(RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE)
.doc("The thread size for sending commit to shuffle servers"))
.createWithDefault(RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE);


public static final ConfigEntry<Boolean> RSS_OZONE_DFS_NAMENODE_ODFS_ENABLE =
createBooleanBuilder(new ConfigBuilder("spark.rss.ozone.dfs.namenode.odfs.enable"))
Expand All @@ -253,39 +272,41 @@ public class RssSparkConfig {
createIntegerBuilder(new ConfigBuilder("spark.rss.client.bitmap.splitNum"))
.createWithDefault(1);

public static final ConfigEntry<String> RSS_ACCESS_ID =
createStringBuilder(new ConfigBuilder("spark.rss.access.id")).createWithDefault("");
public static final ConfigOption<String> RSS_ACCESS_ID =
ConfigOptions.key("spark.rss.access.id")
.stringType()
.defaultValue("")
.withDescription("");


public static final ConfigEntry<Integer> RSS_ACCESS_TIMEOUT_MS =
createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS))
.createWithDefault(RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE);
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS))
.createWithDefault(RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE);

public static final ConfigEntry<Boolean> RSS_ENABLED =
createBooleanBuilder(new ConfigBuilder("spark.rss.enabled")).createWithDefault(false);
createBooleanBuilder(new ConfigBuilder("spark.rss.enabled")).createWithDefault(false);

public static final ConfigEntry<Boolean> RSS_DYNAMIC_CLIENT_CONF_ENABLED =
createBooleanBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED))
.createWithDefault(RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);

public static final ConfigEntry<String> RSS_CLIENT_ASSIGNMENT_TAGS =
createStringBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS)
.doc(
"The comma-separated list of tags for deciding assignment shuffle servers. "
+ "Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag "
+ "whether this conf is set or not"))
.createWithDefault("");

public static final ConfigEntry<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE =
createBooleanBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConf.OFF_HEAP_MEMORY_ENABLE.key())
.doc(RssClientConf.OFF_HEAP_MEMORY_ENABLE.description()))
.createWithDefault(RssClientConf.OFF_HEAP_MEMORY_ENABLE.defaultValue());
createBooleanBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED))
.createWithDefault(RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);

public static final ConfigOption<String> RSS_CLIENT_ASSIGNMENT_TAGS =
ConfigOptions.key("spark.rss.client.assignment.tags")
.stringType()
.defaultValue("")
.withDescription("The comma-separated list of tags for deciding assignment shuffle servers. "
+ "Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag "
+ "whether this conf is set or not");


public static final ConfigOption<Boolean> RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE =
ConfigOptions.key("spark.rss.client.off.heap.memory.enable")
.booleanType()
.defaultValue(false)
.withDescription("Client can use off heap memory");

public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER =
createIntegerBuilder(
Expand All @@ -295,29 +316,31 @@ public class RssSparkConfig {
.createWithDefault(
RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE);


public static final ConfigEntry<Long> RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
createLongBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
createLongBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);


public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_RETRY_TIMES =
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);

public static final ConfigEntry<Long> RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS =
createLongBuilder(
new ConfigBuilder("spark.rss.client.access.retry.interval.ms")
.doc("Interval between retries fallback to SortShuffleManager"))
.createWithDefault(20000L);
createLongBuilder(
new ConfigBuilder("spark.rss.client.access.retry.interval.ms")
.doc("Interval between retries fallback to SortShuffleManager"))
.createWithDefault(20000L);

public static final ConfigEntry<Integer> RSS_CLIENT_ACCESS_RETRY_TIMES =
createIntegerBuilder(
new ConfigBuilder("spark.rss.client.access.retry.times")
.doc("Number of retries fallback to SortShuffleManager"))
.createWithDefault(0);
createIntegerBuilder(
new ConfigBuilder("spark.rss.client.access.retry.times")
.doc("Number of retries fallback to SortShuffleManager"))
.createWithDefault(0);

public static final ConfigEntry<String> RSS_COORDINATOR_QUORUM =
createStringBuilder(
Expand Down Expand Up @@ -359,19 +382,18 @@ public class RssSparkConfig {
RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE);

public static final ConfigEntry<Integer> RSS_SHUFFLE_MANAGER_GRPC_PORT =
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.key())
.internal()
.doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
.createWithDefault(-1);

public static final ConfigEntry<Boolean> RSS_RESUBMIT_STAGE =
createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE)
.internal()
.doc("Whether to enable the resubmit stage."))
.createWithDefault(false);
createIntegerBuilder(
new ConfigBuilder(
SPARK_RSS_CONFIG_PREFIX + RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.key())
.internal()
.doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
.createWithDefault(-1);

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE =
ConfigOptions.key("spark.rss.resubmit.stage")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable the resubmit stage.");

// spark2 doesn't have this key defined
public static final String SPARK_SHUFFLE_COMPRESS_KEY = "spark.shuffle.compress";
Expand Down
Loading

0 comments on commit c2df19e

Please sign in to comment.