Skip to content

Commit

Permalink
[Improvement] Using configOption in RssMRConfig apache#1304
Browse files Browse the repository at this point in the history
  • Loading branch information
guixiaowen committed Nov 24, 2023
1 parent d5de299 commit 42d59f1
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,23 @@ public void init(Context context) throws IOException, ClassNotFoundException {
double sortThreshold =
RssMRUtils.getDouble(
rssJobConf,
RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD.key(),
RssMRConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD.defaultValue());
if (sortThreshold <= 0 || Double.compare(sortThreshold, 1.0) > 0) {
throw new IOException("Invalid sort memory use threshold : " + sortThreshold);
}

int batch =
RssMRUtils.getInt(
rssJobConf,
RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BATCH_TRIGGER_NUM);
RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM.key(),
RssMRConfig.RSS_CLIENT_BATCH_TRIGGER_NUM.defaultValue());
RawComparator<K> comparator = mrJobConf.getOutputKeyComparator();
double memoryThreshold =
RssMRUtils.getDouble(
rssJobConf,
RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_MEMORY_THRESHOLD);
RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD.key(),
RssMRConfig.RSS_CLIENT_MEMORY_THRESHOLD.defaultValue());
ApplicationAttemptId applicationAttemptId = RssMRUtils.getApplicationAttemptId();
String appId = applicationAttemptId.toString();
long taskAttemptId =
Expand All @@ -97,24 +97,24 @@ public void init(Context context) throws IOException, ClassNotFoundException {
double sendThreshold =
RssMRUtils.getDouble(
rssJobConf,
RssMRConfig.RSS_CLIENT_SEND_THRESHOLD,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THRESHOLD);
RssMRConfig.RSS_CLIENT_SEND_THRESHOLD.key(),
RssMRConfig.RSS_CLIENT_SEND_THRESHOLD.defaultValue());

long sendCheckInterval =
RssMRUtils.getLong(
rssJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS_DEFAULT_VALUE);
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(),
RssMRConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.defaultValue());
long sendCheckTimeout =
RssMRUtils.getLong(
rssJobConf,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS,
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS_DEFAULT_VALUE);
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
RssMRConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.defaultValue());
int bitmapSplitNum =
RssMRUtils.getInt(
rssJobConf,
RssMRConfig.RSS_CLIENT_BITMAP_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_BITMAP_NUM);
RssMRConfig.RSS_CLIENT_BITMAP_NUM.key(),
RssMRConfig.RSS_CLIENT_BITMAP_NUM.defaultValue());
int numMaps = mrJobConf.getNumMapTasks();
String storageType = RssMRUtils.getString(rssJobConf, RssMRConfig.RSS_STORAGE_TYPE);
if (StringUtils.isEmpty(storageType)) {
Expand All @@ -127,18 +127,18 @@ public void init(Context context) throws IOException, ClassNotFoundException {
long maxSegmentSize =
RssMRUtils.getLong(
rssJobConf,
RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE,
RssMRConfig.RSS_CLIENT_DEFAULT_MAX_SEGMENT_SIZE);
RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE.key(),
RssMRConfig.RSS_CLIENT_MAX_SEGMENT_SIZE.defaultValue());
int sendThreadNum =
RssMRUtils.getInt(
rssJobConf,
RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM,
RssMRConfig.RSS_CLIENT_DEFAULT_SEND_THREAD_NUM);
RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM.key(),
RssMRConfig.RSS_CLIENT_SEND_THREAD_NUM.defaultValue());
long maxBufferSize =
RssMRUtils.getLong(
rssJobConf,
RssMRConfig.RSS_WRITER_BUFFER_SIZE,
RssMRConfig.RSS_WRITER_BUFFER_SIZE_DEFAULT_VALUE);
RssMRConfig.RSS_WRITER_BUFFER_SIZE.key(),
RssMRConfig.RSS_WRITER_BUFFER_SIZE.defaultValue());
shuffleClient = RssMRUtils.createShuffleClient(mrJobConf);
bufferManager =
new SortWriteBufferManager(
Expand Down
Loading

0 comments on commit 42d59f1

Please sign in to comment.