diff --git a/coordinator/pom.xml b/coordinator/pom.xml index 686534a652..368742d389 100644 --- a/coordinator/pom.xml +++ b/coordinator/pom.xml @@ -73,6 +73,10 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-minicluster + org.mockito mockito-inline diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java new file mode 100644 index 0000000000..c42da449f2 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; + +/** + * AppBalanceSelectStorageStrategy will consider the number of apps allocated on each remote path is balanced. + */ +public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class); + /** + * store appId -> remote path to make sure all shuffle data of the same application + * will be written to the same remote storage + */ + private final Map appIdToRemoteStorageInfo; + /** + * store remote path -> application count for assignment strategy + */ + private final Map remoteStoragePathCounter; + private final Map availableRemoteStorageInfo; + + public AppBalanceSelectStorageStrategy() { + this.appIdToRemoteStorageInfo = Maps.newConcurrentMap(); + this.remoteStoragePathCounter = Maps.newConcurrentMap(); + this.availableRemoteStorageInfo = Maps.newHashMap(); + } + + /** + * the strategy of pick remote storage is according to assignment count + */ + @Override + public RemoteStorageInfo pickRemoteStorage(String appId) { + if (appIdToRemoteStorageInfo.containsKey(appId)) { + return appIdToRemoteStorageInfo.get(appId); + } + + // create list for sort + List> sizeList = + Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull) + .sorted(Comparator.comparingInt(entry -> entry.getValue().getAppNum().get())).collect(Collectors.toList()); + + for (Map.Entry entry : sizeList) { + String storagePath = entry.getKey(); + if (availableRemoteStorageInfo.containsKey(storagePath)) { + appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath)); + incRemoteStorageCounter(storagePath); + break; + } + } + return appIdToRemoteStorageInfo.get(appId); + } + + @Override + @VisibleForTesting + public synchronized void incRemoteStorageCounter(String remoteStoragePath) { + RankValue counter = remoteStoragePathCounter.get(remoteStoragePath); + if (counter != null) { + counter.getAppNum().incrementAndGet(); + } else { + // it may be happened when assignment remote storage + // and refresh remote storage at the same time + LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1", + remoteStoragePath); + remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1)); + } + } + + @Override + @VisibleForTesting + public synchronized void decRemoteStorageCounter(String storagePath) { + if (!StringUtils.isEmpty(storagePath)) { + RankValue atomic = remoteStoragePathCounter.get(storagePath); + if (atomic != null) { + double count = atomic.getAppNum().decrementAndGet(); + if (count < 0) { + LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0", + storagePath, count); + atomic.getAppNum().set(0); + } + } else { + LOG.warn("Can't find counter for remote storage: {}", storagePath); + remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0)); + } + if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0 + && !availableRemoteStorageInfo.containsKey(storagePath)) { + remoteStoragePathCounter.remove(storagePath); + } + } + } + + @Override + public synchronized void removePathFromCounter(String storagePath) { + RankValue atomic = remoteStoragePathCounter.get(storagePath); + if (atomic != null && atomic.getAppNum().get() == 0) { + remoteStoragePathCounter.remove(storagePath); + } + } + + @Override + public Map getAppIdToRemoteStorageInfo() { + return appIdToRemoteStorageInfo; + } + + @Override + public Map getRemoteStoragePathRankValue() { + return remoteStoragePathCounter; + } + + @Override + public Map getAvailableRemoteStorageInfo() { + return availableRemoteStorageInfo; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java index 40a1e1ee49..6fcd6b56f9 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java @@ -19,16 +19,12 @@ import java.net.URI; import java.net.URISyntaxException; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -41,24 +37,38 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.ThreadUtils; +import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; public class ApplicationManager { private static final Logger LOG = LoggerFactory.getLogger(ApplicationManager.class); private long expired; + private StrategyName storageStrategy; private Map appIds = Maps.newConcurrentMap(); + private SelectStorageStrategy selectStorageStrategy; // store appId -> remote path to make sure all shuffle data of the same application // will be written to the same remote storage - private Map appIdToRemoteStorageInfo = Maps.newConcurrentMap(); + private Map appIdToRemoteStorageInfo; // store remote path -> application count for assignment strategy - private Map remoteStoragePathCounter = Maps.newConcurrentMap(); + private Map remoteStoragePathRankValue; private Map remoteStorageToHost = Maps.newConcurrentMap(); - private Map availableRemoteStorageInfo = Maps.newHashMap(); + private Map availableRemoteStorageInfo; private ScheduledExecutorService scheduledExecutorService; // it's only for test case to check if status check has problem private boolean hasErrorInStatusCheck = false; public ApplicationManager(CoordinatorConf conf) { + storageStrategy = conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY); + if (StrategyName.IO_SAMPLE == storageStrategy) { + selectStorageStrategy = new LowestIOSampleCostSelectStorageStrategy(conf); + } else if (StrategyName.APP_BALANCE == storageStrategy) { + selectStorageStrategy = new AppBalanceSelectStorageStrategy(); + } else { + throw new UnsupportedOperationException("Unsupported selected storage strategy."); + } + appIdToRemoteStorageInfo = selectStorageStrategy.getAppIdToRemoteStorageInfo(); + remoteStoragePathRankValue = selectStorageStrategy.getRemoteStoragePathRankValue(); + availableRemoteStorageInfo = selectStorageStrategy.getAvailableRemoteStorageInfo(); expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED); // the thread for checking application status scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( @@ -83,7 +93,7 @@ public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageC // add remote path if not exist for (String path : paths) { if (!availableRemoteStorageInfo.containsKey(path)) { - remoteStoragePathCounter.putIfAbsent(path, new AtomicInteger(0)); + remoteStoragePathRankValue.putIfAbsent(path, new RankValue(0)); // refreshRemoteStorage is designed without multiple thread problem // metrics shouldn't be added duplicated addRemoteStorageMetrics(path); @@ -117,67 +127,17 @@ public void refreshRemoteStorage(String remoteStoragePath, String remoteStorageC // the strategy of pick remote storage is according to assignment count // todo: better strategy with workload balance public RemoteStorageInfo pickRemoteStorage(String appId) { - if (appIdToRemoteStorageInfo.containsKey(appId)) { - return appIdToRemoteStorageInfo.get(appId); - } - - // create list for sort - List> sizeList = - Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull) - .sorted(Comparator.comparingInt(entry -> entry.getValue().get())).collect(Collectors.toList()); - - for (Map.Entry entry : sizeList) { - String storagePath = entry.getKey(); - if (availableRemoteStorageInfo.containsKey(storagePath)) { - appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath)); - incRemoteStorageCounter(storagePath); - break; - } - } + selectStorageStrategy.pickRemoteStorage(appId); return appIdToRemoteStorageInfo.get(appId); } - @VisibleForTesting - protected synchronized void incRemoteStorageCounter(String remoteStoragePath) { - AtomicInteger counter = remoteStoragePathCounter.get(remoteStoragePath); - if (counter != null) { - counter.incrementAndGet(); - } else { - // it may be happened when assignment remote storage - // and refresh remote storage at the same time - LOG.warn("Remote storage path lost during assignment: %s doesn't exist, reset it to 1", - remoteStoragePath); - remoteStoragePathCounter.put(remoteStoragePath, new AtomicInteger(1)); - } - } - @VisibleForTesting protected synchronized void decRemoteStorageCounter(String storagePath) { - if (!StringUtils.isEmpty(storagePath)) { - AtomicInteger atomic = remoteStoragePathCounter.get(storagePath); - if (atomic != null) { - int count = atomic.decrementAndGet(); - if (count < 0) { - LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0", - storagePath, count); - atomic.set(0); - } - } else { - LOG.warn("Can't find counter for remote storage: {}", storagePath); - remoteStoragePathCounter.putIfAbsent(storagePath, new AtomicInteger(0)); - } - if (remoteStoragePathCounter.get(storagePath).get() == 0 - && !availableRemoteStorageInfo.containsKey(storagePath)) { - remoteStoragePathCounter.remove(storagePath); - } - } + selectStorageStrategy.decRemoteStorageCounter(storagePath); } private synchronized void removePathFromCounter(String storagePath) { - AtomicInteger atomic = remoteStoragePathCounter.get(storagePath); - if (atomic != null && atomic.get() == 0) { - remoteStoragePathCounter.remove(storagePath); - } + selectStorageStrategy.removePathFromCounter(storagePath); } public Set getAppIds() { @@ -185,13 +145,13 @@ public Set getAppIds() { } @VisibleForTesting - protected Map getAppIdToRemoteStorageInfo() { - return appIdToRemoteStorageInfo; + protected Map getRemoteStoragePathRankValue() { + return remoteStoragePathRankValue; } @VisibleForTesting - protected Map getRemoteStoragePathCounter() { - return remoteStoragePathCounter; + public SelectStorageStrategy getSelectStorageStrategy() { + return selectStorageStrategy; } @VisibleForTesting @@ -237,7 +197,7 @@ private void updateRemoteStorageMetrics() { try { String storageHost = getStorageHost(remoteStoragePath); CoordinatorMetrics.updateDynamicGaugeForRemoteStorage(storageHost, - remoteStoragePathCounter.get(remoteStoragePath).get()); + remoteStoragePathRankValue.get(remoteStoragePath).getAppNum().get()); } catch (Exception e) { LOG.warn("Update remote storage metrics for {} failed ", remoteStoragePath); } @@ -266,4 +226,9 @@ private String getStorageHost(String remoteStoragePath) { } return storageHost; } + + public enum StrategyName { + APP_BALANCE, + IO_SAMPLE + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 34c65b9841..28931a425a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -26,6 +26,7 @@ import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.util.RssUtils; +import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE; import static org.apache.uniffle.coordinator.AssignmentStrategyFactory.StrategyName.PARTITION_BALANCE; /** @@ -128,7 +129,26 @@ public class CoordinatorConf extends RssBaseConf { .stringType() .noDefaultValue() .withDescription("Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'"); - + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY = + ConfigOptions.key("rss.coordinator.remote.storage.select.strategy") + .enumType(ApplicationManager.StrategyName.class) + .defaultValue(APP_BALANCE) + .withDescription("Strategy for selecting the remote path"); + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME = ConfigOptions + .key("rss.coordinator.remote.storage.io.sample.schedule.time") + .longType() + .defaultValue(60 * 1000L) + .withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS"); + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE = ConfigOptions + .key("rss.coordinator.remote.storage.io.sample.file.size") + .intType() + .defaultValue(204800 * 1000) + .withDescription("The size of the file that the scheduled thread reads and writes"); + public static final ConfigOption COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES = ConfigOptions + .key("rss.coordinator.remote.storage.io.sample.access.times") + .intType() + .defaultValue(3) + .withDescription("The number of times to read and write HDFS files"); public CoordinatorConf() { } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java new file mode 100644 index 0000000000..96147b8e69 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategy.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; +import org.apache.uniffle.common.util.ThreadUtils; + +/** + * LowestIOSampleCostSelectStorageStrategy considers that when allocating apps to different remote paths, + * remote paths that can write and read. Therefore, it may occur that all apps are written to the same cluster. + * At the same time, if a cluster has read and write exceptions, we will automatically avoid the cluster. + */ +public class LowestIOSampleCostSelectStorageStrategy implements SelectStorageStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(LowestIOSampleCostSelectStorageStrategy.class); + /** + * store appId -> remote path to make sure all shuffle data of the same application + * will be written to the same remote storage + */ + private final Map appIdToRemoteStorageInfo; + /** + * store remote path -> application count for assignment strategy + */ + private final Map remoteStoragePathRankValue; + private final Map availableRemoteStorageInfo; + private List> sizeList; + private FileSystem fs; + private Configuration conf; + private final int fileSize; + private final int readAndWriteTimes; + + public LowestIOSampleCostSelectStorageStrategy(CoordinatorConf cf) { + conf = new Configuration(); + fileSize = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_FILE_SIZE); + readAndWriteTimes = cf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_ACCESS_TIMES); + this.appIdToRemoteStorageInfo = Maps.newConcurrentMap(); + this.remoteStoragePathRankValue = Maps.newConcurrentMap(); + this.availableRemoteStorageInfo = Maps.newHashMap(); + this.sizeList = Lists.newCopyOnWriteArrayList(); + ScheduledExecutorService readWriteRankScheduler = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.getThreadFactory("readWriteRankScheduler-%d")); + // should init later than the refreshRemoteStorage init + readWriteRankScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000, + cf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME), TimeUnit.MILLISECONDS); + } + + public void checkReadAndWrite() { + if (remoteStoragePathRankValue.size() > 1) { + for (String path : remoteStoragePathRankValue.keySet()) { + Path remotePath = new Path(path); + Path testPath = new Path(path + "/rssTest"); + long startWriteTime = System.currentTimeMillis(); + try { + fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf); + for (int j = 0; j < readAndWriteTimes; j++) { + byte[] data = RandomUtils.nextBytes(fileSize); + try (FSDataOutputStream fos = fs.create(testPath)) { + fos.write(data); + fos.flush(); + } + byte[] readData = new byte[fileSize]; + int readBytes; + try (FSDataInputStream fis = fs.open(testPath)) { + int hasReadBytes = 0; + do { + readBytes = fis.read(readData); + if (hasReadBytes < fileSize) { + for (int i = 0; i < readBytes; i++) { + if (data[hasReadBytes + i] != readData[i]) { + RankValue rankValue = remoteStoragePathRankValue.get(path); + remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); + } + } + } + hasReadBytes += readBytes; + } while (readBytes != -1); + } + } + } catch (Exception e) { + LOG.error("Storage read and write error, we will not use this remote path {}.", path, e); + RankValue rankValue = remoteStoragePathRankValue.get(path); + remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); + } finally { + sortPathByRankValue(path, testPath, startWriteTime); + } + } + } else { + sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()); + } + } + + @VisibleForTesting + public void sortPathByRankValue(String path, Path testPath, long startWrite) { + try { + fs.delete(testPath, true); + long totalTime = System.currentTimeMillis() - startWrite; + RankValue rankValue = remoteStoragePathRankValue.get(path); + remoteStoragePathRankValue.put(path, new RankValue(totalTime, rankValue.getAppNum().get())); + } catch (Exception e) { + RankValue rankValue = remoteStoragePathRankValue.get(path); + remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE, rankValue.getAppNum().get())); + LOG.error("Failed to sort, we will not use this remote path {}.", path, e); + } finally { + sizeList = Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull) + .sorted(Comparator.comparingDouble( + entry -> entry.getValue().getReadAndWriteTime().get())).collect(Collectors.toList()); + } + } + + /** + * the strategy of pick remote storage is based on whether the remote path can be read or written + */ + @Override + public RemoteStorageInfo pickRemoteStorage(String appId) { + if (appIdToRemoteStorageInfo.containsKey(appId)) { + return appIdToRemoteStorageInfo.get(appId); + } + + for (Map.Entry entry : sizeList) { + String storagePath = entry.getKey(); + if (availableRemoteStorageInfo.containsKey(storagePath)) { + appIdToRemoteStorageInfo.putIfAbsent(appId, availableRemoteStorageInfo.get(storagePath)); + incRemoteStorageCounter(storagePath); + break; + } + } + return appIdToRemoteStorageInfo.get(appId); + } + + @Override + @VisibleForTesting + public synchronized void incRemoteStorageCounter(String remoteStoragePath) { + RankValue counter = remoteStoragePathRankValue.get(remoteStoragePath); + if (counter != null) { + counter.getAppNum().incrementAndGet(); + } else { + remoteStoragePathRankValue.put(remoteStoragePath, new RankValue(1)); + // it may be happened when assignment remote storage + // and refresh remote storage at the same time + LOG.warn("Remote storage path lost during assignment: %s doesn't exist, " + + "reset the rank value to 0 and app size to 1.", remoteStoragePath); + } + } + + @Override + @VisibleForTesting + public synchronized void decRemoteStorageCounter(String storagePath) { + if (!StringUtils.isEmpty(storagePath)) { + RankValue atomic = remoteStoragePathRankValue.get(storagePath); + if (atomic != null) { + double count = atomic.getAppNum().decrementAndGet(); + if (count < 0) { + LOG.warn("Unexpected counter for remote storage: %s, which is %i, reset to 0", + storagePath, count); + atomic.getAppNum().set(0); + } + } else { + remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(1)); + LOG.warn("Can't find counter for remote storage: {}", storagePath); + } + + if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0 + && !availableRemoteStorageInfo.containsKey(storagePath)) { + remoteStoragePathRankValue.remove(storagePath); + } + } + } + + @Override + public synchronized void removePathFromCounter(String storagePath) { + RankValue rankValue = remoteStoragePathRankValue.get(storagePath); + // The time spent reading and writing cannot be used to determine whether the current path is still used by apps. + // Therefore, determine whether the HDFS path is still used by the number of apps + if (rankValue != null && rankValue.getAppNum().get() == 0) { + remoteStoragePathRankValue.remove(storagePath); + } + } + + @Override + public Map getAppIdToRemoteStorageInfo() { + return appIdToRemoteStorageInfo; + } + + @Override + public Map getRemoteStoragePathRankValue() { + return remoteStoragePathRankValue; + } + + @Override + public Map getAvailableRemoteStorageInfo() { + return availableRemoteStorageInfo; + } + + @VisibleForTesting + public void setFs(FileSystem fs) { + this.fs = fs; + } + + @VisibleForTesting + public void setConf(Configuration conf) { + this.conf = conf; + } + + static class RankValue { + AtomicLong readAndWriteTime; + AtomicInteger appNum; + + RankValue(int appNum) { + this.readAndWriteTime = new AtomicLong(0); + this.appNum = new AtomicInteger(appNum); + } + + RankValue(long ratioValue, int appNum) { + this.readAndWriteTime = new AtomicLong(ratioValue); + this.appNum = new AtomicInteger(appNum); + } + + public AtomicLong getReadAndWriteTime() { + return readAndWriteTime; + } + + public AtomicInteger getAppNum() { + return appNum; + } + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java new file mode 100644 index 0000000000..654ec8a07a --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SelectStorageStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.Map; + +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.coordinator.LowestIOSampleCostSelectStorageStrategy.RankValue; + +public interface SelectStorageStrategy { + + RemoteStorageInfo pickRemoteStorage(String appId); + + void incRemoteStorageCounter(String remoteStoragePath); + + void decRemoteStorageCounter(String storagePath); + + void removePathFromCounter(String storagePath); + + Map getAvailableRemoteStorageInfo(); + + Map getAppIdToRemoteStorageInfo(); + + Map getRemoteStoragePathRankValue(); +} diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java new file mode 100644 index 0000000000..d52d65aac7 --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategyTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import com.google.common.collect.Sets; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.util.Constants; + +import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AppBalanceSelectStorageStrategyTest { + + private AppBalanceSelectStorageStrategy appBalanceSelectStorageStrategy; + private ApplicationManager applicationManager; + private long appExpiredTime = 2000L; + private String remotePath1 = "hdfs://path1"; + private String remotePath2 = "hdfs://path2"; + private String remotePath3 = "hdfs://path3"; + + @BeforeAll + public static void setup() { + CoordinatorMetrics.register(); + } + + @AfterAll + public static void clear() { + CoordinatorMetrics.clear(); + } + + @BeforeEach + public void setUp() { + CoordinatorConf conf = new CoordinatorConf(); + conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime); + conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE); + applicationManager = new ApplicationManager(conf); + appBalanceSelectStorageStrategy = (AppBalanceSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); + } + + @Test + public void selectStorageTest() throws Exception { + String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath1).getAppNum().get()); + assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + String storageHost1 = "path1"; + assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5); + String storageHost2 = "path2"; + assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); + + // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call + appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1); + appBalanceSelectStorageStrategy.incRemoteStorageCounter(remotePath1); + String testApp1 = "testApp1"; + applicationManager.refreshAppId(testApp1); + assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(remotePath2, appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); + assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + // return the same value if did the assignment already + assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + + Thread.sleep(appExpiredTime + 2000); + assertNull(appBalanceSelectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1)); + assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); + + // refresh app1, got remotePath2, then remove remotePath2, + // it should be existed in counter until it expired + applicationManager.refreshAppId(testApp1); + assertEquals(remotePath2, appBalanceSelectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + remoteStoragePath = remotePath1; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)), + appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + assertEquals(1, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().get(remotePath2).getAppNum().get()); + // app1 is expired, remotePath2 is removed because of counter = 0 + Thread.sleep(appExpiredTime + 2000); + assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)), + appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + + // restore previous manually inc for next test case + appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1); + appBalanceSelectStorageStrategy.decRemoteStorageCounter(remotePath1); + // remove all remote storage + applicationManager.refreshRemoteStorage("", ""); + assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size()); + assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertFalse(applicationManager.hasErrorInStatusCheck()); + } + + @Test + public void storageCounterMulThreadTest() throws Exception { + String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2 + + Constants.COMMA_SPLIT_CHAR + remotePath3; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + String appPrefix = "testAppId"; + + Thread pickThread1 = new Thread(() -> { + for (int i = 0; i < 1000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + } + }); + + Thread pickThread2 = new Thread(() -> { + for (int i = 1000; i < 2000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + } + }); + + Thread pickThread3 = new Thread(() -> { + for (int i = 2000; i < 3000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + appBalanceSelectStorageStrategy.pickRemoteStorage(appId); + } + }); + pickThread1.start(); + pickThread2.start(); + pickThread3.start(); + pickThread1.join(); + pickThread2.join(); + pickThread3.join(); + Thread.sleep(appExpiredTime + 2000); + + applicationManager.refreshRemoteStorage("", ""); + assertEquals(0, appBalanceSelectStorageStrategy.getAvailableRemoteStorageInfo().size()); + assertEquals(0, appBalanceSelectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertFalse(applicationManager.hasErrorInStatusCheck()); + } +} diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java index 8cbccbc259..791ec1dcf9 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ApplicationManagerTest.java @@ -31,7 +31,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class ApplicationManagerTest { @@ -66,19 +65,19 @@ public void refreshTest() { Set expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2); applicationManager.refreshRemoteStorage(remoteStoragePath, ""); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); - assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet()); + assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet()); remoteStoragePath = remotePath3; expectedAvailablePath = Sets.newHashSet(remotePath3); applicationManager.refreshRemoteStorage(remoteStoragePath, ""); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); - assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet()); + assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet()); remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath3; expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath3); applicationManager.refreshRemoteStorage(remoteStoragePath, remoteStorageConf); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); - assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet()); + assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet()); Map storages = applicationManager.getAvailableRemoteStorageInfo(); RemoteStorageInfo remoteStorageInfo = storages.get(remotePath1); assertEquals(2, remoteStorageInfo.getConfItems().size()); @@ -94,59 +93,7 @@ public void refreshTest() { assertEquals(1, remoteStorageInfo.getConfItems().size()); assertEquals("v3", remoteStorageInfo.getConfItems().get("k3")); assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); - assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathCounter().keySet()); - assertFalse(applicationManager.hasErrorInStatusCheck()); - } - - @Test - public void storageCounterTest() throws Exception { - String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2; - applicationManager.refreshRemoteStorage(remoteStoragePath, ""); - assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath1).get()); - assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get()); - String storageHost1 = "path1"; - assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5); - String storageHost2 = "path2"; - assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); - - // do inc for remotePath1 to make sure pick storage will be remotePath2 in next call - applicationManager.incRemoteStorageCounter(remotePath1); - applicationManager.incRemoteStorageCounter(remotePath1); - String testApp1 = "testApp1"; - applicationManager.refreshAppId(testApp1); - assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); - assertEquals(remotePath2, applicationManager.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); - assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get()); - // return the same value if did the assignment already - assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); - assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get()); - - Thread.sleep(appExpiredTime + 2000); - assertNull(applicationManager.getAppIdToRemoteStorageInfo().get(testApp1)); - assertEquals(0, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get()); - assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); - - // refresh app1, got remotePath2, then remove remotePath2, - // it should be existed in counter until it expired - applicationManager.refreshAppId(testApp1); - assertEquals(remotePath2, applicationManager.pickRemoteStorage(testApp1).getPath()); - remoteStoragePath = remotePath1; - applicationManager.refreshRemoteStorage(remoteStoragePath, ""); - assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1, remotePath2)), - applicationManager.getRemoteStoragePathCounter().keySet()); - assertEquals(1, applicationManager.getRemoteStoragePathCounter().get(remotePath2).get()); - // app1 is expired, remotePath2 is removed because of counter = 0 - Thread.sleep(appExpiredTime + 2000); - assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remotePath1)), - applicationManager.getRemoteStoragePathCounter().keySet()); - - // restore previous manually inc for next test case - applicationManager.decRemoteStorageCounter(remotePath1); - applicationManager.decRemoteStorageCounter(remotePath1); - // remove all remote storage - applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); - assertEquals(0, applicationManager.getRemoteStoragePathCounter().size()); + assertEquals(expectedAvailablePath, applicationManager.getRemoteStoragePathRankValue().keySet()); assertFalse(applicationManager.hasErrorInStatusCheck()); } @@ -163,48 +110,4 @@ public void clearWithoutRemoteStorageTest() throws Exception { assertEquals(0, applicationManager.getAppIds().size()); assertFalse(applicationManager.hasErrorInStatusCheck()); } - - @Test - public void storageCounterMulThreadTest() throws Exception { - String remoteStoragePath = remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2 - + Constants.COMMA_SPLIT_CHAR + remotePath3; - applicationManager.refreshRemoteStorage(remoteStoragePath, ""); - String appPrefix = "testAppId"; - - Thread pickThread1 = new Thread(() -> { - for (int i = 0; i < 1000; i++) { - String appId = appPrefix + i; - applicationManager.refreshAppId(appId); - applicationManager.pickRemoteStorage(appId); - } - }); - - Thread pickThread2 = new Thread(() -> { - for (int i = 1000; i < 2000; i++) { - String appId = appPrefix + i; - applicationManager.refreshAppId(appId); - applicationManager.pickRemoteStorage(appId); - } - }); - - Thread pickThread3 = new Thread(() -> { - for (int i = 2000; i < 3000; i++) { - String appId = appPrefix + i; - applicationManager.refreshAppId(appId); - applicationManager.pickRemoteStorage(appId); - } - }); - pickThread1.start(); - pickThread2.start(); - pickThread3.start(); - pickThread1.join(); - pickThread2.join(); - pickThread3.join(); - Thread.sleep(appExpiredTime + 2000); - - applicationManager.refreshRemoteStorage("", ""); - assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size()); - assertEquals(0, applicationManager.getRemoteStoragePathCounter().size()); - assertFalse(applicationManager.hasErrorInStatusCheck()); - } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java index c5bfc0f245..7a6181740f 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/ClientConfManagerTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.io.PrintWriter; import java.nio.file.Files; import java.util.Map; @@ -28,6 +29,10 @@ import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,6 +41,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.util.Constants; +import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -43,6 +49,11 @@ public class ClientConfManagerTest { + @TempDir + private final File remotePath = new File("hdfs://rss"); + private static MiniDFSCluster cluster; + private Configuration hdfsConf = new Configuration(); + @BeforeEach public void setUp() { CoordinatorMetrics.register(); @@ -53,6 +64,18 @@ public void clear() { CoordinatorMetrics.clear(); } + @AfterAll + public static void close() { + cluster.close(); + } + + public void createMiniHdfs(String hdfsPath) throws IOException { + hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath()); + hdfsConf.set("dfs.nameservices", "rss"); + hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath); + cluster = (new MiniDFSCluster.Builder(hdfsConf)).build(); + } + @Test public void test(@TempDir File tempDir) throws Exception { File cfgFile = File.createTempFile("tmp", ".conf", tempDir); @@ -137,7 +160,7 @@ public void test(@TempDir File tempDir) throws Exception { } @Test - public void dynamicRemoteStorageTest() throws Exception { + public void dynamicRemoteByAppNumStrategyStorageTest() throws Exception { int updateIntervalSec = 2; final String remotePath1 = "hdfs://host1/path1"; final String remotePath2 = "hdfs://host2/path2"; @@ -194,6 +217,83 @@ public void dynamicRemoteStorageTest() throws Exception { clientConfManager.close(); } + @Test + public void dynamicRemoteByHealthStrategyStorageTest() throws Exception { + final int updateIntervalSec = 2; + final String remotePath1 = "hdfs://host1/path1"; + final String remotePath2 = "hdfs://host2/path2"; + final String remotePath3 = "hdfs://host3/path3"; + File cfgFile = Files.createTempFile("dynamicRemoteStorageTest", ".conf").toFile(); + cfgFile.deleteOnExit(); + writeRemoteStorageConf(cfgFile, remotePath1); + createMiniHdfs(remotePath.getAbsolutePath()); + + CoordinatorConf conf = new CoordinatorConf(); + conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, updateIntervalSec); + conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); + conf.set(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); + conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); + + ApplicationManager applicationManager = new ApplicationManager(conf); + // init IORankScheduler + Thread.sleep(2000); + LowestIOSampleCostSelectStorageStrategy selectStorageStrategy = + (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); + Path testPath = new Path("/test"); + FileSystem fs = testPath.getFileSystem(hdfsConf); + selectStorageStrategy.setFs(fs); + + final ClientConfManager clientConfManager = new ClientConfManager(conf, new Configuration(), applicationManager); + Thread.sleep(500); + Set expectedAvailablePath = Sets.newHashSet(remotePath1); + assertEquals(expectedAvailablePath, applicationManager.getAvailableRemoteStorageInfo().keySet()); + selectStorageStrategy.sortPathByRankValue(remotePath1, testPath, System.currentTimeMillis()); + RemoteStorageInfo remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId1"); + assertEquals(remotePath1, remoteStorageInfo.getPath()); + assertTrue(remoteStorageInfo.getConfItems().isEmpty()); + + writeRemoteStorageConf(cfgFile, remotePath3); + expectedAvailablePath = Sets.newHashSet(remotePath3); + waitForUpdate(expectedAvailablePath, applicationManager); + // The reason for setting the filesystem here is to trigger the execution of sortPathByRankValue + selectStorageStrategy.setFs(fs); + selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, System.currentTimeMillis()); + remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId2"); + assertEquals(remotePath3, remoteStorageInfo.getPath()); + + String confItems = "host2,k1=v1,k2=v2;host3,k3=v3"; + final long current = System.currentTimeMillis(); + writeRemoteStorageConf(cfgFile, remotePath2 + Constants.COMMA_SPLIT_CHAR + remotePath3, confItems); + expectedAvailablePath = Sets.newHashSet(remotePath2, remotePath3); + waitForUpdate(expectedAvailablePath, applicationManager); + selectStorageStrategy.setFs(fs); + selectStorageStrategy.sortPathByRankValue(remotePath2, testPath, current); + selectStorageStrategy.setFs(fs); + selectStorageStrategy.sortPathByRankValue(remotePath3, testPath, current); + remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId3"); + assertEquals(remotePath2, remoteStorageInfo.getPath()); + assertEquals(2, remoteStorageInfo.getConfItems().size()); + assertEquals("v1", remoteStorageInfo.getConfItems().get("k1")); + assertEquals("v2", remoteStorageInfo.getConfItems().get("k2")); + + confItems = "host1,keyTest1=test1,keyTest2=test2;host2,k1=deadbeaf"; + writeRemoteStorageConf(cfgFile, remotePath1 + Constants.COMMA_SPLIT_CHAR + remotePath2, confItems); + expectedAvailablePath = Sets.newHashSet(remotePath1, remotePath2); + waitForUpdate(expectedAvailablePath, applicationManager); + remoteStorageInfo = applicationManager.pickRemoteStorage("testAppId4"); + // one of remote storage will be chosen + assertTrue( + (remotePath1.equals(remoteStorageInfo.getPath()) + && (remoteStorageInfo.getConfItems().size() == 2) + && (remoteStorageInfo.getConfItems().get("keyTest1").equals("test1"))) + && (remoteStorageInfo.getConfItems().get("keyTest2").equals("test2")) + || (remotePath2.equals(remoteStorageInfo.getPath()) + && remoteStorageInfo.getConfItems().size() == 1) + && remoteStorageInfo.getConfItems().get("k1").equals("deadbeaf")); + + clientConfManager.close(); + } + private void writeRemoteStorageConf(File cfgFile, String value) throws Exception { writeRemoteStorageConf(cfgFile, value, null); } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java new file mode 100644 index 0000000000..41eec3e758 --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/LowestIOSampleCostSelectStorageStrategyTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.io.File; + +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.uniffle.common.util.Constants; + +import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LowestIOSampleCostSelectStorageStrategyTest { + + private LowestIOSampleCostSelectStorageStrategy selectStorageStrategy; + private ApplicationManager applicationManager; + private static final Configuration hdfsConf = new Configuration(); + private static MiniDFSCluster cluster; + private final long appExpiredTime = 2000L; + private final String remoteStorage1 = "hdfs://p1"; + private final String remoteStorage2 = "hdfs://p2"; + private final String remoteStorage3 = "hdfs://p3"; + private final Path testFile = new Path("test"); + + @TempDir + private static File remotePath = new File("hdfs://rss"); + + @BeforeAll + public static void setup() { + CoordinatorMetrics.register(); + } + + @AfterAll + public static void clear() { + CoordinatorMetrics.clear(); + cluster.close(); + } + + @BeforeEach + public void init() throws Exception { + setUpHdfs(remotePath.getAbsolutePath()); + } + + public void setUpHdfs(String hdfsPath) throws Exception { + hdfsConf.set("fs.defaultFS", remotePath.getAbsolutePath()); + hdfsConf.set("dfs.nameservices", "rss"); + hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsPath); + cluster = (new MiniDFSCluster.Builder(hdfsConf)).build(); + Thread.sleep(500L); + CoordinatorConf conf = new CoordinatorConf(); + conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime); + conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 5000); + conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); + applicationManager = new ApplicationManager(conf); + selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy(); + selectStorageStrategy.setConf(hdfsConf); + Thread.sleep(1000); + } + + @Test + public void selectStorageTest() throws Exception { + FileSystem fs = testFile.getFileSystem(hdfsConf); + selectStorageStrategy.setFs(fs); + + String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + //default value is 0 + assertEquals(0, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage1).getReadAndWriteTime().get()); + assertEquals(0, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getReadAndWriteTime().get()); + String storageHost1 = "p1"; + assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost1).get(), 0.5); + String storageHost2 = "p2"; + assertEquals(0.0, CoordinatorMetrics.gaugeInUsedRemoteStorage.get(storageHost2).get(), 0.5); + + // compare with two remote path + selectStorageStrategy.incRemoteStorageCounter(remoteStorage1); + selectStorageStrategy.incRemoteStorageCounter(remoteStorage1); + String testApp1 = "testApp1"; + final long current = System.currentTimeMillis(); + applicationManager.refreshAppId(testApp1); + fs.create(testFile); + selectStorageStrategy.sortPathByRankValue(remoteStorage2, testFile, current); + fs.create(testFile); + selectStorageStrategy.sortPathByRankValue(remoteStorage1, testFile, current); + assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(remoteStorage2, selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1).getPath()); + assertEquals(1, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + // return the same value if did the assignment already + assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + assertEquals(1, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + + // when the expiration time is reached, the app was removed + Thread.sleep(appExpiredTime + 2000); + assertNull(selectStorageStrategy.getAppIdToRemoteStorageInfo().get(testApp1)); + assertEquals(0, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + + // refresh app1, got remotePath2, then remove remotePath2, + // it should be existed in counter until it expired + applicationManager.refreshAppId(testApp1); + assertEquals(remoteStorage2, selectStorageStrategy.pickRemoteStorage(testApp1).getPath()); + remoteStoragePath = remoteStorage1; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1, remoteStorage2)), + selectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + assertTrue(selectStorageStrategy.getRemoteStoragePathRankValue() + .get(remoteStorage2).getReadAndWriteTime().get() > 0); + assertEquals(1, + selectStorageStrategy.getRemoteStoragePathRankValue().get(remoteStorage2).getAppNum().get()); + // app1 is expired, p2 is removed because of counter = 0 + Thread.sleep(appExpiredTime + 2000); + assertEquals(Sets.newConcurrentHashSet(Sets.newHashSet(remoteStorage1)), + selectStorageStrategy.getRemoteStoragePathRankValue().keySet()); + // restore previous manually inc for next test case + selectStorageStrategy.decRemoteStorageCounter(remoteStorage1); + selectStorageStrategy.decRemoteStorageCounter(remoteStorage1); + // remove all remote storage + applicationManager.refreshRemoteStorage("", ""); + assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size()); + assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertFalse(applicationManager.hasErrorInStatusCheck()); + } + + @Test + public void selectStorageMulThreadTest() throws Exception { + FileSystem fs = testFile.getFileSystem(hdfsConf); + selectStorageStrategy.setFs(fs); + String remoteStoragePath = remoteStorage1 + Constants.COMMA_SPLIT_CHAR + remoteStorage2 + + Constants.COMMA_SPLIT_CHAR + remoteStorage3; + applicationManager.refreshRemoteStorage(remoteStoragePath, ""); + String appPrefix = "testAppId"; + + Thread pickThread1 = new Thread(() -> { + for (int i = 0; i < 1000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + selectStorageStrategy.pickRemoteStorage(appId); + } + }); + + Thread pickThread2 = new Thread(() -> { + for (int i = 1000; i < 2000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + selectStorageStrategy.pickRemoteStorage(appId); + } + }); + + Thread pickThread3 = new Thread(() -> { + for (int i = 2000; i < 3000; i++) { + String appId = appPrefix + i; + applicationManager.refreshAppId(appId); + selectStorageStrategy.pickRemoteStorage(appId); + } + }); + pickThread1.start(); + pickThread2.start(); + pickThread3.start(); + pickThread1.join(); + pickThread2.join(); + pickThread3.join(); + Thread.sleep(appExpiredTime + 2000); + + applicationManager.refreshRemoteStorage("", ""); + assertEquals(0, selectStorageStrategy.getAvailableRemoteStorageInfo().size()); + assertEquals(0, selectStorageStrategy.getRemoteStoragePathRankValue().size()); + assertFalse(applicationManager.hasErrorInStatusCheck()); + } +} diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index cd71326d70..f9c263a27d 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -95,6 +95,10 @@ This document will introduce how to deploy Uniffle coordinators. |rss.coordinator.remote.storage.cluster.conf|-|Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';'| |rss.rpc.server.port|-|RPC port for coordinator| |rss.jetty.http.port|-|Http port for coordinator| +|rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for selecting the remote path| +|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of scheduling the read and write time of the paths to obtain different HDFS| +|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the file that the scheduled thread reads and writes| +|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times to read and write HDFS files| ### AccessClusterLoadChecker settings |Property Name|Default| Description| diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java index c93723c630..73450d7deb 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/FetchClientConfTest.java @@ -39,6 +39,7 @@ import org.apache.uniffle.coordinator.ApplicationManager; import org.apache.uniffle.coordinator.CoordinatorConf; +import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -92,10 +93,11 @@ public void test(@TempDir File tempDir) throws Exception { response = coordinatorClient.fetchClientConf(request); assertEquals(ResponseStatusCode.INTERNAL_ERROR, response.getStatusCode()); assertEquals(0, response.getClientConf().size()); + shutdownServers(); } @Test - public void testFetchRemoteStorage(@TempDir File tempDir) throws Exception { + public void testFetchRemoteStorageByApp(@TempDir File tempDir) throws Exception { String remotePath1 = "hdfs://path1"; File cfgFile = File.createTempFile("tmp", ".conf", tempDir); String contItem = "path2,key1=test1,key2=test2"; @@ -139,6 +141,59 @@ public void testFetchRemoteStorage(@TempDir File tempDir) throws Exception { assertEquals(2, remoteStorageInfo.getConfItems().size()); assertEquals("test1", remoteStorageInfo.getConfItems().get("key1")); assertEquals("test2", remoteStorageInfo.getConfItems().get("key2")); + shutdownServers(); + } + + @Test + public void testFetchRemoteStorageByIO(@TempDir File tempDir) throws Exception { + String remotePath1 = "hdfs://path1"; + File cfgFile = File.createTempFile("tmp", ".conf", tempDir); + String contItem = "path2,key1=test1,key2=test2"; + Map dynamicConf = Maps.newHashMap(); + dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath1); + dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_CLUSTER_CONF.key(), contItem); + writeRemoteStorageConf(cfgFile, dynamicConf); + + CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setBoolean(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED, true); + coordinatorConf.setString(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_PATH, cfgFile.toURI().toString()); + coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 2); + coordinatorConf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_IO_SAMPLE_SCHEDULE_TIME, 500); + coordinatorConf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE); + createCoordinatorServer(coordinatorConf); + startServers(); + + waitForUpdate(Sets.newHashSet(remotePath1), coordinators.get(0).getApplicationManager()); + String appId = "testFetchRemoteStorageApp"; + RssFetchRemoteStorageRequest request = new RssFetchRemoteStorageRequest(appId); + RssFetchRemoteStorageResponse response = coordinatorClient.fetchRemoteStorage(request); + RemoteStorageInfo remoteStorageInfo = response.getRemoteStorageInfo(); + assertTrue(remoteStorageInfo.getConfItems().isEmpty()); + assertEquals(remotePath1, remoteStorageInfo.getPath()); + + // update remote storage info + String remotePath2 = "hdfs://path2"; + dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(), remotePath2); + writeRemoteStorageConf(cfgFile, dynamicConf); + waitForUpdate(Sets.newHashSet(remotePath2), coordinators.get(0).getApplicationManager()); + request = new RssFetchRemoteStorageRequest(appId); + response = coordinatorClient.fetchRemoteStorage(request); + // remotePath1 will be return because (appId -> remote storage path) is in cache + remoteStorageInfo = response.getRemoteStorageInfo(); + assertEquals(remotePath1, remoteStorageInfo.getPath()); + assertTrue(remoteStorageInfo.getConfItems().isEmpty()); + + // ensure sizeList can be updated + Thread.sleep(2000); + request = new RssFetchRemoteStorageRequest(appId + "another"); + response = coordinatorClient.fetchRemoteStorage(request); + // got the remotePath2 for new appId + remoteStorageInfo = response.getRemoteStorageInfo(); + assertEquals(remotePath2, remoteStorageInfo.getPath()); + assertEquals(2, remoteStorageInfo.getConfItems().size()); + assertEquals("test1", remoteStorageInfo.getConfItems().get("key1")); + assertEquals("test2", remoteStorageInfo.getConfItems().get("key2")); + shutdownServers(); } private void waitForUpdate(