Skip to content

Commit

Permalink
[#299] improvement: Make config type of RSS_STORAGE_TYPE as enum (#1052)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

improvement: Make config type of `RSS_STORAGE_TYPE` in `RssBaseConf` as enum

### Why are the changes needed?

Fix: #299

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

current UT and integration testing
  • Loading branch information
cchung100m committed Aug 7, 2023
1 parent dbb9890 commit e9071fd
Show file tree
Hide file tree
Showing 40 changed files with 127 additions and 74 deletions.
28 changes: 28 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/StorageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common;

public enum StorageType {
MEMORY,
LOCALFILE,
MEMORY_LOCALFILE,
HDFS,
MEMORY_HDFS,
LOCALFILE_HDFS,
MEMORY_LOCALFILE_HDFS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.RssUtils;

Expand Down Expand Up @@ -137,9 +138,9 @@ public class RssBaseConf extends RssConf {
.defaultValue(60 * 1000L)
.withDescription("Remote shuffle service client type grpc timeout (ms)");

public static final ConfigOption<String> RSS_STORAGE_TYPE =
public static final ConfigOption<StorageType> RSS_STORAGE_TYPE =
ConfigOptions.key("rss.storage.type")
.stringType()
.enumType(StorageType.class)
.noDefaultValue()
.withDescription("Data storage for remote shuffle service");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public static void setupServers(@TempDir File serverTmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(data1.getAbsolutePath(), data2.getAbsolutePath()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static void setupServers(@TempDir File serverTmpDir) throws Exception {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(data1.getAbsolutePath()));
shuffleServerConf.setDouble(
Expand All @@ -84,7 +85,8 @@ public static void setupServers(@TempDir File serverTmpDir) throws Exception {
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 18081);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(data2.getAbsolutePath()));
shuffleServerConf.setDouble(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public void buildInCheckerTest() {
ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(),
LocalStorageChecker.class.getCanonicalName());
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("s1"));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
assertConf(conf);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE, -1.0);
assertConf(conf);
conf.set(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE, 102.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 1000L);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setLong(
ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1L * 1024L * 1024L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 1000L);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setLong(
ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1000L * 1024L * 1024L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void setUp(@TempDir File tmpDir) throws Exception {
File dataDir2 = new File(tmpDir, "data2");
List<String> basePath =
Lists.newArrayList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath());
shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
createShuffleServer(shuffleServerConf);
File dataDir3 = new File(tmpDir, "data3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
shuffleServerConf.setInteger(
ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, MAX_CONCURRENCY);
shuffleServerConf.setBoolean(shuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ private void registerShuffle(RssRegisterShuffleRequest rrsr) {

public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 5000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 40.0);
Expand All @@ -273,8 +274,8 @@ public static MockedShuffleServer createServer(int id, File tmpDir) throws Excep
File dataDir1 = new File(tmpDir, id + "_1");
File dataDir2 = new File(tmpDir, id + "_2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 450L);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 20 + id);
shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(
ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, EVENT_THRESHOLD_SIZE);
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
createShuffleServer(shuffleServerConf);
startServers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static ShuffleServerConf getShuffleServerConf() throws Exception {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
return serverConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
File dataDir = new File(tmpDir, "data");
String basePath = dataDir.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 450L);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 5000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File dataDir = new File(tmpDir, "data");
String basePath = dataDir.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 5000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1024L * 1024L);

createShuffleServer(shuffleServerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb");
createShuffleServer(shuffleServerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static void setupServers(@TempDir File tmpDir) throws Exception {
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb");
createShuffleServer(shuffleServerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public DefaultFlushEventHandler(
StorageManager storageManager,
Consumer<ShuffleDataFlushEvent> eventConsumer) {
this.shuffleServerConf = conf;
this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE));
this.storageType =
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name());
this.storageManager = storageManager;
this.eventConsumer = eventConsumer;
initFlushEventExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage> storages)
if (CollectionUtils.isEmpty(basePaths)) {
throw new IllegalArgumentException("The base path cannot be empty");
}
String storageType = conf.getString(ShuffleServerConf.RSS_STORAGE_TYPE);
String storageType = conf.get(ShuffleServerConf.RSS_STORAGE_TYPE).name();
if (!ShuffleStorageUtils.containsLocalFile(storageType)) {
throw new IllegalArgumentException(
"Only StorageType contains LOCALFILE support storageChecker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ShuffleFlushManager(
this.storageManager = storageManager;
initHadoopConf();
retryMax = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_WRITE_RETRY_MAX);
storageType = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE);
storageType = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name();
storageDataReplica = shuffleServerConf.get(RssBaseConf.RSS_STORAGE_DATA_REPLICA);

storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void stopServer() throws Exception {

private void initialization() throws Exception {
boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
String storageType = shuffleServerConf.getString(RSS_STORAGE_TYPE);
String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
if (!testMode
&& (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ public void getLocalShuffleData(
.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
}
}
String storageType = shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
String storageType =
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleDataResponse reply = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public ShuffleIndexResult getShuffleIndex(
int partitionNumPerRange,
int partitionNum) {
refreshAppId(appId);
String storageType = conf.getString(RssBaseConf.RSS_STORAGE_TYPE);
String storageType = conf.get(RssBaseConf.RSS_STORAGE_TYPE).name();
CreateShuffleReadHandlerRequest request = new CreateShuffleReadHandlerRequest();
request.setAppId(appId);
request.setShuffleId(shuffleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat
.recordTransportTime(GetLocalShuffleDataRequest.class.getName(), transportTime);
}
}
String storageType = shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE);
String storageType =
shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleDataResponse response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static StorageManagerFactory getInstance() {
}

public StorageManager createStorageManager(ShuffleServerConf conf) {
StorageType type = StorageType.valueOf(conf.get(ShuffleServerConf.RSS_STORAGE_TYPE));
StorageType type = StorageType.valueOf(conf.get(ShuffleServerConf.RSS_STORAGE_TYPE).name());
if (StorageType.LOCALFILE.equals(type) || StorageType.MEMORY_LOCALFILE.equals(type)) {
return new LocalStorageManager(conf);
} else if (StorageType.HDFS.equals(type) || StorageType.MEMORY_HDFS.equals(type)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ShuffleFlushManagerOnKerberizedHadoopTest extends KerberizedHadoopB
public void prepare() throws Exception {
ShuffleServerMetrics.register();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
LogManager.getRootLogger().setLevel(Level.INFO);

Expand Down
Loading

0 comments on commit e9071fd

Please sign in to comment.