diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java index 1066960c13..855760f239 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java @@ -72,8 +72,8 @@ public void init(Context context) throws IOException, ClassNotFoundException { double sortThreshold = RssMRUtils.getDouble( rssJobConf, - RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD, - RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD); + RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD.key(), + RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD.defaultValue()); if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) { throw new IOException("Invalid sort memory use threshold : " + sortThreshold); } @@ -81,14 +81,14 @@ public void init(Context context) throws IOException, ClassNotFoundException { int batch = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM, - RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM); + RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM.key(), + RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM.defaultValue()); RawComparator comparator = mrJobConf.getOutputKeyComparator(); double memoryThreshold = RssMRUtils.getDouble( rssJobConf, - RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD, - RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD); + RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD.key(), + RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD.defaultValue()); ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId(); String appId = applicationAttemptId.toString(); long taskAttemptId = @@ -97,24 +97,24 @@ public void init(Context context) throws IOException, ClassNotFoundException { double sendThreshold = RssMRUtils.getDouble( rssJobConf, - RssMRConfig.RSS_CLIENT_SEND_THRESHOLD, - RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD); + RssMRConfig.RSS_CLIENT_SEND_THRESHOLD.key(), + RssMRConfig.RSS_CLIENT_SEND_THRESHOLD.defaultValue()); long sendCheckInterval = RssMRUtils.getLong( rssJobConf, - RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS, - RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), + RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.defaultValue()); long sendCheckTimeout = RssMRUtils.getLong( rssJobConf, - RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, - RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), + RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.defaultValue()); int bitmapSplitNum = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_CLIENT_BITMAP_NUM, - RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM); + RssMRConfig.RSS_CLIENT_BITMAP_NUM.key(), + RssMRConfig.RSS_CLIENT_BITMAP_NUM.defaultValue()); int numMaps = mrJobConf.getNumMapTasks(); String storageType = RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_STORAGE_TYPE); if (StringUtils.isEmpty(storageType)) { @@ -127,18 +127,18 @@ public void init(Context context) throws IOException, ClassNotFoundException { long maxSegmentSize = RssMRUtils.getLong( rssJobConf, - RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE, - RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE); + RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE.key(), + RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE.defaultValue()); int sendThreadNum = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM, - RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM); + RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM.key(), + RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM.defaultValue()); long maxBufferSize = RssMRUtils.getLong( rssJobConf, - RssMRConfig.RSS_WRITER_BUFFER_SIZE, - RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE); + RssMRConfig.RSS_WRITER_BUFFER_SIZE.key(), + RssMRConfig.RSS_WRITER_BUFFER_SIZE.defaultValue()); shuffleClient = RssMRUtils.createShuffleClient(mrJobConf); bufferManager = new SortWriteBufferManager( diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java index 004aa4bc0d..b2d8031c2a 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.uniffle.client.util.RssClientConfig; +import org.apache.uniffle.common.config.ConfigOption; +import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.RssConf; public class RssMRConfig { @@ -32,160 +34,240 @@ public class RssMRConfig { public static final String MR_RSS_CONFIG_PREFIX = "mapreduce.rss."; - public static final String RSS_CLIENT_HEARTBEAT_THREAD_NUM = - MR_CONFIG_PREFIX + "rss.client.heartBeat.threadNum"; - public static final int RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE = 4; - public static final String RSS_CLIENT_TYPE = MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_TYPE; - public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE; - public static final String RSS_CLIENT_RETRY_MAX = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX; - public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE; - public static final String RSS_CLIENT_RETRY_INTERVAL_MAX = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX; - public static final long RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE; + public static final ConfigOption RSS_CLIENT_HEARTBEAT_THREAD_NUM = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.heartBeat.threadNum") + .intType() + .defaultValue(4) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_TYPE = + ConfigOptions.key("mapreduce.rss.client.type") + .stringType() + .defaultValue(RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_RETRY_MAX = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_MAX) + .intType() + .defaultValue(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_RETRY_INTERVAL_MAX = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX) + .longType() + .defaultValue(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE) + .withDescription(""); + public static final String RSS_COORDINATOR_QUORUM = MR_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM; - public static final String RSS_DATA_REPLICA = MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA; - public static final int RSS_DATA_REPLICA_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE; - public static final String RSS_DATA_REPLICA_WRITE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE; - public static final int RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE; - public static final String RSS_DATA_REPLICA_READ = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ; - public static final int RSS_DATA_REPLICA_READ_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE; - public static final String RSS_DATA_REPLICA_SKIP_ENABLED = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED; - public static final String RSS_DATA_TRANSFER_POOL_SIZE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE; - public static final int RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE; - public static final String RSS_DATA_COMMIT_POOL_SIZE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE; - public static final int RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE; - - public static final String RSS_CLIENT_SEND_THREAD_NUM = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM; - public static final int RSS_CLIENT_DEFAULT_SEND_THREAD_NUM = - RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM; - public static final String RSS_CLIENT_SEND_THRESHOLD = - MR_CONFIG_PREFIX + "rss.client.send.threshold"; - public static final double RSS_CLIENT_DEFAULT_SEND_THRESHOLD = 0.2f; - public static final boolean RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE = - RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE; - public static final String RSS_HEARTBEAT_INTERVAL = - MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL; - public static final long RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE = - RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE; + + public static final ConfigOption RSS_DATA_REPLICA = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA) + .intType() + .defaultValue(RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DATA_REPLICA_WRITE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_WRITE) + .intType() + .defaultValue(RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DATA_REPLICA_READ = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_READ) + .intType() + .defaultValue(RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DATA_REPLICA_SKIP_ENABLED = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED) + .booleanType() + .defaultValue(RssClientConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DATA_TRANSFER_POOL_SIZE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_TRANSFER_POOL_SIZE) + .intType() + .defaultValue(RssClientConfig.RSS_DATA_TRANFER_POOL_SIZE_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DATA_COMMIT_POOL_SIZE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE) + .intType() + .defaultValue(RssClientConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_SEND_THREAD_NUM = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_THREAD_NUM) + .intType() + .defaultValue(RssClientConfig.RSS_CLIENT_DEFAULT_SEND_NUM) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_SEND_THRESHOLD = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.send.threshold") + .doubleType() + .defaultValue(0.2f) + .withDescription(""); + + public static final ConfigOption RSS_HEARTBEAT_INTERVAL = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_INTERVAL) + .longType() + .defaultValue(RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE) + .withDescription(""); + public static final String RSS_HEARTBEAT_TIMEOUT = MR_CONFIG_PREFIX + RssClientConfig.RSS_HEARTBEAT_TIMEOUT; + public static final String RSS_ASSIGNMENT_PREFIX = MR_CONFIG_PREFIX + "rss.assignment.partition."; - public static final String RSS_CLIENT_BATCH_TRIGGER_NUM = - MR_CONFIG_PREFIX + "rss.client.batch.trigger.num"; - public static final int RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM = 50; - public static final String RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD = - MR_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold"; - public static final String RSS_WRITER_BUFFER_SIZE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE; - public static final long RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE = 1024 * 1024 * 14; - public static final double RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD = 0.9f; - public static final String RSS_CLIENT_MEMORY_THRESHOLD = - MR_CONFIG_PREFIX + "rss.client.memory.threshold"; - public static final double RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD = 0.8f; - public static final String RSS_CLIENT_SEND_CHECK_INTERVAL_MS = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS; - public static final long RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE; - public static final String RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS; - public static final long RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE; - public static final String RSS_CLIENT_BITMAP_NUM = MR_CONFIG_PREFIX + "rss.client.bitmap.num"; - public static final int RSS_CLIENT_DEFAULT_BITMAP_NUM = 1; - public static final String RSS_CLIENT_MAX_SEGMENT_SIZE = - MR_CONFIG_PREFIX + "rss.client.max.buffer.size"; - public static final long RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE = 3 * 1024; + + public static final ConfigOption RSS_CLIENT_BATCH_TRIGGER_NUM = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.batch.trigger.num") + .intType() + .defaultValue(50) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.sort.memory.use.threshold") + .doubleType() + .defaultValue(0.9f) + .withDescription(""); + + public static final ConfigOption RSS_WRITER_BUFFER_SIZE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_WRITER_BUFFER_SIZE) + .longType() + .defaultValue(1024 * 1024 * 14) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_MEMORY_THRESHOLD = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.memory.threshold") + .doubleType() + .defaultValue(0.8f) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_SEND_CHECK_INTERVAL_MS = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS) + .longType() + .defaultValue(RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_SEND_CHECK_TIMEOUT_MS = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS) + .longType() + .defaultValue(RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_BITMAP_NUM = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.bitmap.num") + .intType() + .defaultValue(1) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_MAX_SEGMENT_SIZE = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.client.max.buffer.size") + .longType() + .defaultValue(3 * 1024) + .withDescription(""); + public static final String RSS_STORAGE_TYPE = MR_CONFIG_PREFIX + RssClientConfig.RSS_STORAGE_TYPE; - public static final String RSS_REDUCE_REMOTE_SPILL_ENABLED = - MR_CONFIG_PREFIX + "rss.reduce.remote.spill.enable"; - public static final boolean RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT = false; - public static final String RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC = - MR_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc"; - public static final int RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT = 1; - public static final String RSS_REDUCE_REMOTE_SPILL_REPLICATION = - MR_CONFIG_PREFIX + "rss.reduce.remote.spill.replication"; - public static final int RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT = 1; - public static final String RSS_REDUCE_REMOTE_SPILL_RETRIES = - MR_CONFIG_PREFIX + "rss.reduce.remote.spill.retries"; - public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5; - - public static final String RSS_PARTITION_NUM_PER_RANGE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE; - public static final int RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE = - RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE; + public static final ConfigOption RSS_REDUCE_REMOTE_SPILL_ENABLED = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.reduce.remote.spill.enable") + .booleanType() + .defaultValue(false) + .withDescription(""); + + public static final ConfigOption RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.reduce.remote.spill.attempt.inc") + .intType() + .defaultValue(1) + .withDescription(""); + + public static final ConfigOption RSS_REDUCE_REMOTE_SPILL_REPLICATION = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.reduce.remote.spill.replication") + .intType() + .defaultValue(1) + .withDescription(""); + + public static final ConfigOption RSS_REDUCE_REMOTE_SPILL_RETRIES = + ConfigOptions.key(MR_CONFIG_PREFIX + "rss.reduce.remote.spill.retries") + .intType() + .defaultValue(5) + .withDescription(""); + + public static final ConfigOption RSS_PARTITION_NUM_PER_RANGE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_PARTITION_NUM_PER_RANGE) + .intType() + .defaultValue(RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE) + .withDescription(""); + public static final String RSS_REMOTE_STORAGE_PATH = MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_STORAGE_PATH; + public static final String RSS_REMOTE_STORAGE_CONF = MR_CONFIG_PREFIX + "rss.remote.storage.conf"; - public static final String RSS_INDEX_READ_LIMIT = - MR_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT; - public static final int RSS_INDEX_READ_LIMIT_DEFAULT_VALUE = - RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE; - public static final String RSS_CLIENT_READ_BUFFER_SIZE = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE; - - // When the size of read buffer reaches the half of JVM region (i.e., 32m), - // it will incur humongous allocation, so we set it to 14m. - public static final String RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE; - - public static final String RSS_DYNAMIC_CLIENT_CONF_ENABLED = - MR_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED; - public static final boolean RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE = - RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE; - public static final String RSS_ACCESS_TIMEOUT_MS = - MR_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS; - public static final int RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE = - RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE; + + public static final ConfigOption RSS_INDEX_READ_LIMIT = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_INDEX_READ_LIMIT) + .intType() + .defaultValue(RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_READ_BUFFER_SIZE = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE) + .stringType() + .defaultValue(RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_DYNAMIC_CLIENT_CONF_ENABLED = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED) + .booleanType() + .defaultValue(RssClientConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_ACCESS_TIMEOUT_MS = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_ACCESS_TIMEOUT_MS) + .intType() + .defaultValue(RssClientConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE) + .withDescription(""); public static final String RSS_CLIENT_ASSIGNMENT_TAGS = MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_TAGS; - public static final String RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = - RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; - public static final int RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE; - - public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL; - public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE; - public static final String RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = - MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES; - public static final int RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE = - RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE; - - public static final String RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = - MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED; - public static final boolean RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE = - RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE; - - public static final String RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR = - MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR; - - public static final double RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE = - RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE; - - public static final String RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = - MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER; - public static final int RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE = - RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE; + public static final ConfigOption RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER = + ConfigOptions.key(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER) + .intType() + .defaultValue(RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL) + .longType() + .defaultValue(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES) + .intType() + .defaultValue(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED) + .booleanType() + .defaultValue(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR = + ConfigOptions.key( + MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR) + .doubleType() + .defaultValue(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE) + .withDescription(""); + + public static final ConfigOption RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER = + ConfigOptions.key(MR_CONFIG_PREFIX + RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER) + .intType() + .defaultValue(RssClientConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE) + .withDescription(""); public static final String RSS_CONF_FILE = "rss_conf.xml"; diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java index 5be31d3055..d1b03f47ea 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java @@ -83,37 +83,41 @@ public static TaskAttemptID createMRTaskAttemptId( public static ShuffleWriteClient createShuffleClient(JobConf jobConf) { int heartBeatThreadNum = jobConf.getInt( - RssMRConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM, - RssMRConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM.key(), + RssMRConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM.defaultValue()); int retryMax = jobConf.getInt( - RssMRConfig.RSS_CLIENT_RETRY_MAX, RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_RETRY_MAX.key(), + RssMRConfig.RSS_CLIENT_RETRY_MAX.defaultValue()); long retryIntervalMax = jobConf.getLong( - RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, - RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), + RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.defaultValue()); String clientType = - jobConf.get(RssMRConfig.RSS_CLIENT_TYPE, RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE); + jobConf.get(RssMRConfig.RSS_CLIENT_TYPE.key(), RssMRConfig.RSS_CLIENT_TYPE.defaultValue()); int replicaWrite = jobConf.getInt( - RssMRConfig.RSS_DATA_REPLICA_WRITE, RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_WRITE.key(), + RssMRConfig.RSS_DATA_REPLICA_WRITE.defaultValue()); int replicaRead = jobConf.getInt( - RssMRConfig.RSS_DATA_REPLICA_READ, RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_READ.key(), + RssMRConfig.RSS_DATA_REPLICA_READ.defaultValue()); int replica = - jobConf.getInt(RssMRConfig.RSS_DATA_REPLICA, RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); + jobConf.getInt( + RssMRConfig.RSS_DATA_REPLICA.key(), RssMRConfig.RSS_DATA_REPLICA.defaultValue()); boolean replicaSkipEnabled = jobConf.getBoolean( - RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED, - RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED.key(), + RssMRConfig.RSS_DATA_REPLICA_SKIP_ENABLED.defaultValue()); int dataTransferPoolSize = jobConf.getInt( - RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE, - RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE.key(), + RssMRConfig.RSS_DATA_TRANSFER_POOL_SIZE.defaultValue()); int dataCommitPoolSize = jobConf.getInt( - RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE, - RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE.key(), + RssMRConfig.RSS_DATA_COMMIT_POOL_SIZE.defaultValue()); ShuffleWriteClient client = ShuffleClientFactory.getInstance() .createShuffleWriteClient( @@ -279,8 +283,8 @@ public static long getTaskAttemptId(long blockId) { public static int estimateTaskConcurrency(JobConf jobConf) { double dynamicFactor = jobConf.getDouble( - RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR, - RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR_DEFAULT_VALUE); + RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR.key(), + RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_DYNAMIC_FACTOR.defaultValue()); double slowStart = jobConf.getDouble(Constants.MR_SLOW_START, Constants.MR_SLOW_START_DEFAULT_VALUE); int mapNum = jobConf.getNumMapTasks(); @@ -301,20 +305,20 @@ public static int estimateTaskConcurrency(JobConf jobConf) { public static int getRequiredShuffleServerNumber(JobConf jobConf) { int requiredShuffleServerNumber = jobConf.getInt( - RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, - RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER.key(), + RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER.defaultValue()); boolean enabledEstimateServer = jobConf.getBoolean( - RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, - RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE); + RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED.key(), + RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED.defaultValue()); if (!enabledEstimateServer || requiredShuffleServerNumber > 0) { return requiredShuffleServerNumber; } int taskConcurrency = estimateTaskConcurrency(jobConf); int taskConcurrencyPerServer = jobConf.getInt( - RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER, - RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER_DEFAULT_VALUE); + RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER.key(), + RssMRConfig.RSS_ESTIMATE_TASK_CONCURRENCY_PER_SERVER.defaultValue()); return (int) Math.ceil(taskConcurrency * 1.0 / taskConcurrencyPerServer); } @@ -322,27 +326,27 @@ public static void validateRssClientConf(Configuration rssJobConf) { int retryMax = getInt( rssJobConf, - RssMRConfig.RSS_CLIENT_RETRY_MAX, - RssMRConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_RETRY_MAX.key(), + RssMRConfig.RSS_CLIENT_RETRY_MAX.defaultValue()); long retryIntervalMax = getLong( rssJobConf, - RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, - RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), + RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.defaultValue()); long sendCheckTimeout = getLong( rssJobConf, - RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, - RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), + RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.defaultValue()); if (retryIntervalMax * retryMax > sendCheckTimeout) { throw new IllegalArgumentException( String.format( "%s(%s) * %s(%s) should not bigger than %s(%s)", - RssMRConfig.RSS_CLIENT_RETRY_MAX, + RssMRConfig.RSS_CLIENT_RETRY_MAX.key(), retryMax, - RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, + RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), retryIntervalMax, - RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, + RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), sendCheckTimeout)); } } diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java index 1f7aae942d..d2afbb80ad 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssShuffle.java @@ -102,23 +102,25 @@ public void init(ShuffleConsumerPlugin.Context context) { this.replicaWrite = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_DATA_REPLICA_WRITE, - RssMRConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_WRITE.key(), + RssMRConfig.RSS_DATA_REPLICA_WRITE.defaultValue()); this.replicaRead = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_DATA_REPLICA_READ, - RssMRConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE); + RssMRConfig.RSS_DATA_REPLICA_READ.key(), + RssMRConfig.RSS_DATA_REPLICA_READ.defaultValue()); this.replica = RssMRUtils.getInt( - rssJobConf, RssMRConfig.RSS_DATA_REPLICA, RssMRConfig.RSS_DATA_REPLICA_DEFAULT_VALUE); + rssJobConf, + RssMRConfig.RSS_DATA_REPLICA.key(), + RssMRConfig.RSS_DATA_REPLICA.defaultValue()); this.partitionNum = mrJobConf.getNumReduceTasks(); this.partitionNumPerRange = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_PARTITION_NUM_PER_RANGE, - RssMRConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE); + RssMRConfig.RSS_PARTITION_NUM_PER_RANGE.key(), + RssMRConfig.RSS_PARTITION_NUM_PER_RANGE.defaultValue()); this.basePath = RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_REMOTE_STORAGE_PATH); String remoteStorageConf = RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_REMOTE_STORAGE_CONF, ""); @@ -130,21 +132,21 @@ protected MergeManager createMergeManager(ShuffleConsumerPlugin.Context co boolean useRemoteSpill = RssMRUtils.getBoolean( rssJobConf, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT); + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED.key(), + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED.defaultValue()); if (useRemoteSpill) { // Use minimized replica, because spilled data can be recomputed by reduce task. // Instead, we use more retries on HDFS client. int replication = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION_DEFAULT); + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION.key(), + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_REPLICATION.defaultValue()); int retries = RssMRUtils.getInt( rssJobConf, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT); + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES.key(), + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_RETRIES.defaultValue()); return new RssRemoteMergeManagerImpl( appId, reduceId, diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java index a631696c71..c489408014 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java @@ -165,16 +165,16 @@ public Thread newThread(Runnable r) { // get remote storage from coordinator if necessary boolean dynamicConfEnabled = conf.getBoolean( - RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED, - RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE); + RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), + RssMRConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.defaultValue()); // fetch client conf and apply them if necessary if (dynamicConfEnabled) { Map clusterClientConf = client.fetchClientConf( conf.getInt( - RssMRConfig.RSS_ACCESS_TIMEOUT_MS, - RssMRConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE)); + RssMRConfig.RSS_ACCESS_TIMEOUT_MS.key(), + RssMRConfig.RSS_ACCESS_TIMEOUT_MS.defaultValue())); RssMRUtils.applyDynamicClientConf(extraConf, clusterClientConf); } @@ -187,7 +187,8 @@ public Thread newThread(Runnable r) { } assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION); String clientType = - extraConf.get(RssMRConfig.RSS_CLIENT_TYPE, RssMRConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE); + extraConf.get( + RssMRConfig.RSS_CLIENT_TYPE.key(), RssMRConfig.RSS_CLIENT_TYPE.defaultValue()); ClientUtils.validateClientType(clientType); assignmentTags.add(clientType); @@ -207,8 +208,8 @@ public Thread newThread(Runnable r) { RssMRUtils.validateRssClientConf(extraConf); // When containers have disk with very limited space, reduce is allowed to spill data to hdfs if (conf.getBoolean( - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED_DEFAULT)) { + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED.key(), + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ENABLED.defaultValue())) { if (remoteStorage.isEmpty()) { throw new IllegalArgumentException( @@ -223,11 +224,11 @@ public Thread newThread(Runnable r) { int originalAttempts = conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4); int inc = conf.getInt( - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC, - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC_DEFAULT); + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC.key(), + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC.defaultValue()); if (inc < 0) { throw new IllegalArgumentException( - RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC + " cannot be negative"); + RssMRConfig.RSS_REDUCE_REMOTE_SPILL_ATTEMPT_INC.key() + " cannot be negative"); } conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, originalAttempts + inc); } @@ -237,12 +238,12 @@ public Thread newThread(Runnable r) { // same result long retryInterval = conf.getLong( - RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL, - RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL.key(), + RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL.defaultValue()); int retryTimes = conf.getInt( - RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES, - RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE); + RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES.key(), + RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES.defaultValue()); ShuffleAssignmentsInfo response; try { response = @@ -296,7 +297,8 @@ public Thread newThread(Runnable r) { } long heartbeatInterval = conf.getLong( - RssMRConfig.RSS_HEARTBEAT_INTERVAL, RssMRConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE); + RssMRConfig.RSS_HEARTBEAT_INTERVAL.key(), + RssMRConfig.RSS_HEARTBEAT_INTERVAL.defaultValue()); long heartbeatTimeout = conf.getLong(RssMRConfig.RSS_HEARTBEAT_TIMEOUT, heartbeatInterval / 2); client.registerApplicationInfo(appId, heartbeatTimeout, "user"); diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java index cb5c2c6562..830ad3c361 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/RssMRUtilsTest.java @@ -144,41 +144,41 @@ public void applyDynamicClientConfTest() { RssMRUtils.applyDynamicClientConf(conf, clientConf); assertEquals(remoteStoragePath, conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH)); assertEquals( - RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE, conf.get(RssMRConfig.RSS_CLIENT_TYPE)); + RssClientConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE, conf.get(RssMRConfig.RSS_CLIENT_TYPE.key())); assertEquals( Integer.toString(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX)); + conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX.key())); assertEquals( Long.toString(RssClientConfig.RSS_CLIENT_RETRY_INTERVAL_MAX_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX)); + conf.get(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key())); assertEquals( Integer.toString(RssClientConfig.RSS_DATA_REPLICA_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_DATA_REPLICA)); + conf.get(RssMRConfig.RSS_DATA_REPLICA.key())); assertEquals( Integer.toString(RssClientConfig.RSS_DATA_REPLICA_WRITE_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_DATA_REPLICA_WRITE)); + conf.get(RssMRConfig.RSS_DATA_REPLICA_WRITE.key())); assertEquals( Integer.toString(RssClientConfig.RSS_DATA_REPLICA_READ_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_DATA_REPLICA_READ)); + conf.get(RssMRConfig.RSS_DATA_REPLICA_READ.key())); assertEquals( Long.toString(RssClientConfig.RSS_HEARTBEAT_INTERVAL_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_HEARTBEAT_INTERVAL)); + conf.get(RssMRConfig.RSS_HEARTBEAT_INTERVAL.key())); assertEquals(StorageType.MEMORY_LOCALFILE_HDFS.name(), conf.get(RssMRConfig.RSS_STORAGE_TYPE)); assertEquals( Long.toString(RssClientConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS)); + conf.get(RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key())); assertEquals( Long.toString(RssClientConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS)); + conf.get(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key())); assertEquals( Integer.toString(RssClientConfig.RSS_PARTITION_NUM_PER_RANGE_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_PARTITION_NUM_PER_RANGE)); + conf.get(RssMRConfig.RSS_PARTITION_NUM_PER_RANGE.key())); assertEquals( Integer.toString(RssClientConfig.RSS_INDEX_READ_LIMIT_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_INDEX_READ_LIMIT)); + conf.get(RssMRConfig.RSS_INDEX_READ_LIMIT.key())); assertEquals( RssClientConfig.RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE, - conf.get(RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE)); + conf.get(RssMRConfig.RSS_CLIENT_READ_BUFFER_SIZE.key())); assertEquals(mockValue, conf.get(mockKey)); String remoteStoragePath2 = "hdfs://path2"; @@ -195,7 +195,7 @@ public void applyDynamicClientConfTest() { assertEquals(mockValue, conf.get(mockKey)); assertEquals( Integer.toString(RssClientConfig.RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE), - conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX)); + conf.get(RssMRConfig.RSS_CLIENT_RETRY_MAX.key())); } @Test @@ -220,13 +220,13 @@ public void testGetRequiredShuffleServerNumber() { JobConf jobConf = new JobConf(); jobConf.setInt("mapreduce.job.maps", 500); jobConf.setInt("mapreduce.job.reduces", 20); - jobConf.setInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, 10); + jobConf.setInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER.key(), 10); assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf)); - jobConf.setBoolean(RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED, true); + jobConf.setBoolean(RssMRConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED.key(), true); assertEquals(10, RssMRUtils.getRequiredShuffleServerNumber(jobConf)); - jobConf.unset(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER); + jobConf.unset(RssMRConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER.key()); assertEquals(7, RssMRUtils.getRequiredShuffleServerNumber(jobConf)); jobConf.setDouble(Constants.MR_SLOW_START, 1.0); @@ -247,9 +247,9 @@ public void testValidateRssClientConf() { rssJobConf.setInt("mapreduce.job.maps", 500); rssJobConf.setInt("mapreduce.job.reduces", 20); RssMRUtils.validateRssClientConf(rssJobConf); - rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX, 5); - rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX, 1000L); - rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS, 4999L); + rssJobConf.setInt(RssMRConfig.RSS_CLIENT_RETRY_MAX.key(), 5); + rssJobConf.setLong(RssMRConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), 1000L); + rssJobConf.setLong(RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), 4999L); try { RssMRUtils.validateRssClientConf(rssJobConf); fail(EXPECTED_EXCEPTION_MESSAGE);