diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractPermitManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractPermitManager.java new file mode 100644 index 0000000000000..450d0433a6eb9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractPermitManager.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +/** + * A class to manage the issuance and recycling of Permits. + */ +public interface AbstractPermitManager { + /** + * Request one Permit instance for the NS. + */ + Permit acquirePermit(); + + /** + * Release the permit instance for the NS. + */ + void releasePermit(Permit permit); + + void drainPermits(); + + int availablePermits(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index fe498c66b7ee8..466fc69de4b57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -20,8 +20,7 @@ import java.util.HashMap; import java.util.Map; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.Set; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -29,6 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + /** * Base fairness policy that implements @RouterRpcFairnessPolicyController. * Internally a map of nameservice to Semaphore is used to control permits. @@ -40,49 +42,70 @@ public class AbstractRouterRpcFairnessPolicyController LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); /** Hash table to hold semaphore for each configured name service. */ - private Map permits; + private Map permits; + + /** Version to support dynamically change permits. **/ + private final int version; + + public static final String ERROR_MSG = "Configured handlers " + + DFS_ROUTER_HANDLER_COUNT_KEY + '=' + + " %d is less than the minimum required handlers %d"; + + AbstractRouterRpcFairnessPolicyController(int version) { + permits = new HashMap<>(); + this.version = version; + } - public void init(Configuration conf) { - this.permits = new HashMap<>(); + /** + * Init the permits. + */ + public void initPermits(Map newPermits) { + this.permits = newPermits; } @Override - public boolean acquirePermit(String nsId) { - try { - LOG.debug("Taking lock for nameservice {}", nsId); - return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.debug("Cannot get a permit for nameservice {}", nsId); + public int getVersion() { + return this.version; + } + + @Override + public Permit acquirePermit(String nsId) { + LOG.debug("Taking lock for nameservice {}", nsId); + AbstractPermitManager permitManager = permits.get(nsId); + if (permitManager != null) { + return permitManager.acquirePermit(); + } else { + // TODO Add one metric to monitor this abnormal case. + LOG.warn("Can't find NSPermit for {}.", nsId, new Throwable()); + return Permit.getNoPermit(); } - return false; } @Override - public void releasePermit(String nsId) { - this.permits.get(nsId).release(); + public void releasePermit(String nsId, Permit permitInstance) { + if (permitInstance == null || permitInstance.isDontNeedPermit()) { + return; + } + AbstractPermitManager permitManager = + this.permits.get(nsId); + if (permitManager != null) { + permitManager.releasePermit(permitInstance); + } } @Override public void shutdown() { LOG.debug("Shutting down router fairness policy controller"); // drain all semaphores - for (Semaphore sema: this.permits.values()) { - sema.drainPermits(); + for (AbstractPermitManager permitManager: this.permits.values()) { + permitManager.drainPermits(); } } - protected void insertNameServiceWithPermits(String nsId, int maxPermits) { - this.permits.put(nsId, new Semaphore(maxPermits)); - } - - protected int getAvailablePermits(String nsId) { - return this.permits.get(nsId).availablePermits(); - } - @Override public String getAvailableHandlerOnPerNs() { JSONObject json = new JSONObject(); - for (Map.Entry entry : permits.entrySet()) { + for (Map.Entry entry : permits.entrySet()) { try { String nsId = entry.getKey(); int availableHandler = entry.getValue().availablePermits(); @@ -93,4 +116,30 @@ public String getAvailableHandlerOnPerNs() { } return json.toString(); } + + protected int getDedicatedHandlers(Configuration conf, String nsId) { + return conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 1); + } + + /** + * Validate all configured dedicated handlers for the nameservices. + * @return sum of dedicated handlers of all nameservices + * @throws IllegalArgumentException + * if total dedicated handlers more than handler count. + */ + protected int validateHandlersCount(Configuration conf, + int handlerCount, Set allConfiguredNS) { + int totalDedicatedHandlers = 0; + for (String nsId : allConfiguredNS) { + int dedicatedHandlers = getDedicatedHandlers(conf, nsId); + totalDedicatedHandlers += dedicatedHandlers; + } + if (totalDedicatedHandlers > handlerCount) { + String msg = String.format(ERROR_MSG, handlerCount, + totalDedicatedHandlers); + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + return totalDedicatedHandlers; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticPermitManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticPermitManager.java new file mode 100644 index 0000000000000..0a61de4bec7ed --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticPermitManager.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdfs.server.federation.fairness.Permit.PermitType; + +/** + * A class that manages permits in an exclusive and shared mode. + */ +public class ElasticPermitManager implements AbstractPermitManager { + private static final Logger LOG = LoggerFactory.getLogger(ElasticPermitManager.class); + private final String nsId; + /** Dedicated Permit Numbers. **/ + private final int dedicatedCap; + private final Semaphore dedicatedPermits; + /** The maximum number of elastic permits that can be acquired. **/ + private final int maximumELNumberCanUse; + private final Semaphore maximumELPermitsCanUse; + /** Total elastic permits. **/ + private final Semaphore totalElasticPermits; + /** A version to ensure that permits can be correctly recycled, + * after dynamically refreshing FairnessPolicyController. **/ + private final int version; + /** A cached permits Map. **/ + private final Map cachedPermits; + + ElasticPermitManager(String nsId, int dedicatedCap, + int maximumELNumberCanUse, Semaphore totalElasticPermits, int version) { + this.nsId = nsId; + this.dedicatedCap = dedicatedCap; + this.dedicatedPermits = new Semaphore(dedicatedCap); + this.maximumELNumberCanUse = maximumELNumberCanUse; + this.maximumELPermitsCanUse = new Semaphore(maximumELNumberCanUse); + this.totalElasticPermits = totalElasticPermits; + this.version = version; + this.cachedPermits = Permit.getPermitBaseOnVersion(version); + LOG.info("New NSPermitManager " + this); + } + + @Override + public String toString() { + return "NSPermitManager[nsId=" + nsId + + ", dedicatedPermitsNumber=" + dedicatedCap + + ", sharedPermitsNumber=" + maximumELNumberCanUse + + ", totalSharedPermits=" + totalElasticPermits + + ", version=" + version + "]"; + } + + @Override + public Permit acquirePermit() { + Permit permit = this.cachedPermits.get(PermitType.NO_PERMIT); + try { + if (acquireDedicatedPermit()) { + permit = this.cachedPermits.get(PermitType.DEDICATED); + } else if (acquireElasticPermit()) { + permit = this.cachedPermits.get(PermitType.SHARED); + } + } catch (InterruptedException e) { + // ignore + } + return permit; + } + + /** + * Try to acquire one permit from Dedicated Semaphore. + * Tips: without timeout to ensure lower latency + */ + private boolean acquireDedicatedPermit() throws InterruptedException { + return this.dedicatedPermits.tryAcquire(); + } + + /** + * Try to acquire one permit from Total Elastic Permits. + */ + private boolean acquireElasticPermit() throws InterruptedException { + boolean result = false; + if (maximumELNumberCanUse > 0 && totalElasticPermits != null) { + if (this.maximumELPermitsCanUse.tryAcquire(1, TimeUnit.SECONDS)) { + if (this.totalElasticPermits.tryAcquire(1, TimeUnit.SECONDS)) { + result = true; + } else { + this.maximumELPermitsCanUse.release(); + } + } + } + return result; + } + + @Override + public void releasePermit(Permit permit) { + if (permit.getPermitVersion() == version) { + switch (permit.getPermitType()) { + case DEDICATED: + this.dedicatedPermits.release(); + break; + case SHARED: + this.maximumELPermitsCanUse.release(); + this.totalElasticPermits.release(); + break; + case DONT_NEED_PERMIT: + break; + default: + LOG.warn("Unexpected permitType {}", permit.getPermitType()); + break; + } + } else { + LOG.warn("Wrong permit version fro {}, expect {}, actual {}.", + nsId, version, permit.getPermitVersion()); + } + } + + @Override + public void drainPermits() { + this.dedicatedPermits.drainPermits(); + } + + @Override + public int availablePermits() { + return this.dedicatedPermits.availablePermits() + + this.maximumELPermitsCanUse.availablePermits(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticRouterRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..fcea1bf679473 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/ElasticRouterRpcFairnessPolicyController.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + +/** + * A subclass of RouterRpcFairnessPolicyController to flexible control + * the number of permits that each nameservice can use. + */ +public class ElasticRouterRpcFairnessPolicyController + extends AbstractRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(ElasticRouterRpcFairnessPolicyController.class); + private Semaphore totalElasticPermits = null; + + public ElasticRouterRpcFairnessPolicyController(Configuration conf, int version) { + super(version); + init(conf); + } + + private void init(Configuration conf) { + if (conf == null) { + conf = new HdfsConfiguration(); + } + int newVersion = getVersion(); + Map permitManagerMap = new HashMap<>(); + int handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, + DFS_ROUTER_HANDLER_COUNT_DEFAULT); + + LOG.info("Handlers available for fairness assignment {} ", handlerCount); + + Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + allConfiguredNS.add(CONCURRENT_NS); + int totalDedicatedHandlers = validateHandlersCount( + conf, handlerCount, allConfiguredNS); + int totalElasticHandlers = handlerCount - totalDedicatedHandlers; + + if (totalElasticHandlers > 0) { + this.totalElasticPermits = new Semaphore(totalElasticHandlers); + } + + for (String nsId : allConfiguredNS) { + int maximumETPermitsCanUse = getMaximumETPermitCanUseForTheNS( + conf, nsId, totalElasticHandlers); + int dedicatedHandlers = getDedicatedHandlers(conf, nsId); + + AbstractPermitManager permitManager = new ElasticPermitManager( + nsId, dedicatedHandlers, maximumETPermitsCanUse, + this.totalElasticPermits, newVersion); + + permitManagerMap.put(nsId, permitManager); + LOG.info("Assigned {} dedicatedPermits and {} maximumETPermits for nsId {} ", + dedicatedHandlers, maximumETPermitsCanUse, nsId); + } + initPermits(permitManagerMap); + } + + /** + * Get the maximum of elastic Permits that the ns can use. + * @param totalETPermits total number of elastic permits. + * @return long + */ + private int getMaximumETPermitCanUseForTheNS( + Configuration conf, String nsId, int totalETPermits) { + int defaultElasticPermitPercent = conf.getInt( + DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT_KEY, + DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT); + int maxPercent = conf.getInt( + DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + nsId, + defaultElasticPermitPercent); + if (maxPercent < 0) { + maxPercent = 0; + } + return (int) Math.ceil(totalETPermits * (1.0 * Math.min(maxPercent, 100) / 100)); + } + + @Override + public void shutdown() { + super.shutdown(); + if (this.totalElasticPermits != null) { + totalElasticPermits.drainPermits(); + } + } + + @VisibleForTesting + int getTotalElasticAvailableHandler() { + int available = 0; + if (this.totalElasticPermits != null) { + available = this.totalElasticPermits.availablePermits(); + } + return available; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java index 3b85da59e1f52..c17d3ff88d01d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java @@ -28,17 +28,17 @@ public class NoRouterRpcFairnessPolicyController implements RouterRpcFairnessPolicyController { - public NoRouterRpcFairnessPolicyController(Configuration conf) { + public NoRouterRpcFairnessPolicyController(Configuration conf, int version) { // Dummy constructor. } @Override - public boolean acquirePermit(String nsId) { - return true; + public Permit acquirePermit(String nsId) { + return Permit.getDontNeedPermit(); } @Override - public void releasePermit(String nsId) { + public void releasePermit(String nsId, Permit permit) { // Dummy, pass through. } @@ -51,4 +51,9 @@ public void shutdown() { public String getAvailableHandlerOnPerNs(){ return "N/A"; } + + @Override + public int getVersion() { + return 0; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/Permit.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/Permit.java new file mode 100644 index 0000000000000..5b57aca297079 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/Permit.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.classification.VisibleForTesting; + +import java.util.HashMap; +import java.util.Map; + +/** + * A permit object contains PermitType and PermitVersion. + */ +public class Permit { + private static final Permit DONTNEEDPERMIT = + new Permit(PermitType.DONT_NEED_PERMIT, 0); + private static final Permit NOPERMIT = + new Permit(PermitType.NO_PERMIT, 0); + + public static Permit getDontNeedPermit() { + return DONTNEEDPERMIT; + } + + public static Permit getNoPermit() { + return NOPERMIT; + } + + private final PermitType permitType; + private final int permitVersion; + + public Permit(PermitType permitType, int version) { + this.permitType = permitType; + this.permitVersion = version; + } + + PermitType getPermitType() { + return permitType; + } + + int getPermitVersion() { + return permitVersion; + } + + public boolean isNoPermit() { + return this.permitType.equals(PermitType.NO_PERMIT); + } + + @VisibleForTesting + public boolean isHoldPermit() { + return !isNoPermit(); + } + + public boolean isDontNeedPermit() { + return this.permitType.equals(PermitType.DONT_NEED_PERMIT); + } + + @Override + public String toString() { + return "Permit[permitType=" + permitType + + ",version=" + permitVersion + "]"; + } + + public enum PermitType { + /** permit which is dedicated by namespace. */ + DEDICATED, + /** permit which is shared by all namespaces. */ + SHARED, + /** mock permit while no need to get permit. */ + DONT_NEED_PERMIT, + /** a mock permit while can't get permits. */ + NO_PERMIT + } + + /** + * Build Permit instance of each PermitType based on the version. + */ + public static Map getPermitBaseOnVersion(int version) { + Map permitMap = new HashMap<>(); + for (PermitType permitType : PermitType.values()) { + permitMap.put(permitType, new Permit(permitType, version)); + } + return permitMap; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java index 354383a168f4e..1939a04815249 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java @@ -46,7 +46,7 @@ public interface RouterRpcFairnessPolicyController { * @param nsId NS id for which a permission to continue is requested. * @return true or false based on whether permit is given. */ - boolean acquirePermit(String nsId); + Permit acquirePermit(String nsId); /** * Handler threads are expected to invoke this method that signals @@ -56,7 +56,7 @@ public interface RouterRpcFairnessPolicyController { * * @param nsId Name service id for which permission release request is made. */ - void releasePermit(String nsId); + void releasePermit(String nsId, Permit permit); /** * Shutdown steps to stop accepting new permission requests and clean-up. @@ -67,4 +67,10 @@ public interface RouterRpcFairnessPolicyController { * Returns the JSON string of the available handler for each Ns. */ String getAvailableHandlerOnPerNs(); + + /** + * A version to ensure that we can correctly recycle permits + * after dynamically refreshing the RpcFairnessPolicyController. + */ + int getVersion(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticPermitManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticPermitManager.java new file mode 100644 index 0000000000000..4388a16fab5fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticPermitManager.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdfs.server.federation.fairness.Permit.PermitType; + +/** + * A class that manages permits in an exclusive mode. + */ +public class StaticPermitManager implements AbstractPermitManager { + private final static Logger LOG = LoggerFactory.getLogger(StaticPermitManager.class); + private final String nsId; + private final int version; + private final int permitCap; + private final Semaphore dedicatedPermits; + private final Map cachedPermits; + + public StaticPermitManager( + final String nsId, final int version, + final int permitCap) { + this.nsId = nsId; + this.version = version; + this.permitCap = permitCap; + this.dedicatedPermits = new Semaphore(permitCap); + this.cachedPermits = Permit.getPermitBaseOnVersion(version); + LOG.info("{} has {} dedicated permits.", nsId, permitCap); + } + + @Override + public String toString() { + return "[StaticPermitManager, nsId=" + nsId + + ", version=" + version + + ", permitCap=" + permitCap + + ", availablePermits=" + this.dedicatedPermits.availablePermits() + + "]"; + } + + @Override + public Permit acquirePermit() { + Permit permit = Permit.getNoPermit(); + try { + if (dedicatedPermits.tryAcquire(1, TimeUnit.SECONDS)) { + permit = this.cachedPermits.get(PermitType.DEDICATED); + } + } catch (InterruptedException e) { + // ignore + } + return permit; + } + + @Override + public void releasePermit(Permit permit) { + if (permit.getPermitVersion() == version) { + if (permit.getPermitType() == PermitType.DEDICATED) { + this.dedicatedPermits.release(); + } + } else { + LOG.warn("Wrong permit version for {}, expect {}, actual {}", + nsId, version, permit.getPermitVersion()); + } + } + + @Override + public void drainPermits() { + this.dedicatedPermits.drainPermits(); + } + + @Override + public int availablePermits() { + return this.dedicatedPermits.availablePermits(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java index aa0777fc03d69..5ec82369fc394 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java @@ -23,6 +23,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.HashSet; @@ -42,55 +44,52 @@ public class StaticRouterRpcFairnessPolicyController extends private static final Logger LOG = LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class); - public static final String ERROR_MSG = "Configured handlers " - + DFS_ROUTER_HANDLER_COUNT_KEY + '=' - + " %d is less than the minimum required handlers %d"; - - public StaticRouterRpcFairnessPolicyController(Configuration conf) { + public StaticRouterRpcFairnessPolicyController(Configuration conf, int version) { + super(version); init(conf); } public void init(Configuration conf) throws IllegalArgumentException { - super.init(conf); // Total handlers configured to process all incoming Rpc. int handlerCount = conf.getInt( DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); - LOG.info("Handlers available for fairness assignment {} ", handlerCount); - // Get all name services configured Set allConfiguredNS = FederationUtil.getAllConfiguredNS(conf); + // Insert the concurrent nameservice into the set to process together + allConfiguredNS.add(CONCURRENT_NS); + + validateHandlersCount(conf, handlerCount, allConfiguredNS); + Map newPermits = new HashMap<>(); // Set to hold name services that are not // configured with dedicated handlers. Set unassignedNS = new HashSet<>(); - - // Insert the concurrent nameservice into the set to process together - allConfiguredNS.add(CONCURRENT_NS); - validateHandlersCount(conf, handlerCount, allConfiguredNS); for (String nsId : allConfiguredNS) { int dedicatedHandlers = conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); if (dedicatedHandlers > 0) { handlerCount -= dedicatedHandlers; - insertNameServiceWithPermits(nsId, dedicatedHandlers); + newPermits.put(nsId, new StaticPermitManager( + nsId, getVersion(), dedicatedHandlers)); logAssignment(nsId, dedicatedHandlers); } else { unassignedNS.add(nsId); } } - // Assign remaining handlers equally to remaining name services and // general pool if applicable. if (!unassignedNS.isEmpty()) { - LOG.info("Unassigned ns {}", unassignedNS.toString()); + LOG.info("Unassigned ns {}", unassignedNS); int handlersPerNS = handlerCount / unassignedNS.size(); LOG.info("Handlers available per ns {}", handlersPerNS); for (String nsId : unassignedNS) { - insertNameServiceWithPermits(nsId, handlersPerNS); + // Each NS should have at least one handler assigned. + newPermits.put(nsId, new StaticPermitManager( + nsId, getVersion(), handlersPerNS)); logAssignment(nsId, handlersPerNS); } } @@ -98,41 +97,19 @@ public void init(Configuration conf) // Assign remaining handlers if any to fan out calls. int leftOverHandlers = unassignedNS.isEmpty() ? handlerCount : handlerCount % unassignedNS.size(); - int existingPermits = getAvailablePermits(CONCURRENT_NS); + int existingPermits = newPermits.get(CONCURRENT_NS).availablePermits(); if (leftOverHandlers > 0) { LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers); - insertNameServiceWithPermits(CONCURRENT_NS, - existingPermits + leftOverHandlers); + newPermits.put(CONCURRENT_NS, new StaticPermitManager( + CONCURRENT_NS, getVersion(), existingPermits + leftOverHandlers)); } LOG.info("Final permit allocation for concurrent ns: {}", - getAvailablePermits(CONCURRENT_NS)); + newPermits.get(CONCURRENT_NS).availablePermits()); + initPermits(newPermits); } private static void logAssignment(String nsId, int count) { LOG.info("Assigned {} handlers to nsId {} ", count, nsId); } - - private void validateHandlersCount(Configuration conf, int handlerCount, - Set allConfiguredNS) { - int totalDedicatedHandlers = 0; - for (String nsId : allConfiguredNS) { - int dedicatedHandlers = - conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0); - if (dedicatedHandlers > 0) { - // Total handlers should not be less than sum of dedicated handlers. - totalDedicatedHandlers += dedicatedHandlers; - } else { - // Each NS should have at least one handler assigned. - totalDedicatedHandlers++; - } - } - if (totalDedicatedHandlers > handlerCount) { - String msg = String.format(ERROR_MSG, handlerCount, - totalDedicatedHandlers); - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index e593e888c9ac4..503b9b22244c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -263,20 +263,27 @@ public static HdfsFileStatus updateMountPointStatus(HdfsFileStatus dirStatus, * @return Fairness policy controller. */ public static RouterRpcFairnessPolicyController newFairnessPolicyController( - Configuration conf) { + Configuration conf, int version) { Class clazz = conf.getClass( RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT, RouterRpcFairnessPolicyController.class); - return newInstance(conf, null, null, clazz); + // Constructor with configuration but no context + try { + Constructor constructor = clazz.getConstructor( + Configuration.class, int.class); + return (RouterRpcFairnessPolicyController) constructor.newInstance( + conf, version); + } catch (ReflectiveOperationException e) { + LOG.error("Could not instantiate: {}", clazz.getSimpleName(), e); + return null; + } } /** * Collect all configured nameservices. * - * @param conf * @return Set of name services in config - * @throws IllegalArgumentException */ public static Set getAllConfiguredNS(Configuration conf) throws IllegalArgumentException { @@ -284,7 +291,7 @@ public static Set getAllConfiguredNS(Configuration conf) Collection namenodes = conf.getTrimmedStringCollection( DFS_ROUTER_MONITOR_NAMENODE); - Set nameservices = new HashSet(); + HashSet nameservices = new HashSet<>(); for (String namenode : namenodes) { String[] namenodeSplit = namenode.split("\\."); String nsId; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index c0a9e3f294cd8..00b7b2ba8e7ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -354,6 +354,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { NoRouterRpcFairnessPolicyController.class; public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX = FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count."; + public static final String DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "elastic.permits.percent."; + public static final String DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT_KEY = + FEDERATION_ROUTER_FAIRNESS_PREFIX + "elastic.permits.percent.default"; + public static final int DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT = 20; // HDFS Router Federation Rename. public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index ff90854ebb7ec..d106465876e11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hdfs.server.federation.fairness.Permit; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; @@ -157,7 +158,7 @@ public RouterRpcClient(Configuration conf, Router router, this.connectionManager = new ConnectionManager(clientConf); this.connectionManager.start(); this.routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + FederationUtil.newFairnessPolicyController(conf, 0); int numThreads = conf.getInt( RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, @@ -830,7 +831,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(nsId, ugi, method, controller); + Permit permit = acquirePermit(nsId, ugi, method, controller); try { List nns = getNamenodesForNameservice(nsId); @@ -840,7 +841,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) Object[] params = method.getParams(loc); return invokeMethod(ugi, nns, proto, m, params); } finally { - releasePermit(nsId, ugi, method, controller); + releasePermit(nsId, ugi, method, controller, permit); } } @@ -999,7 +1000,7 @@ public RemoteResult invokeSequential( // Invoke in priority order for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); - acquirePermit(ns, ugi, remoteMethod, controller); + Permit permit = acquirePermit(ns, ugi, remoteMethod, controller); List namenodes = getNamenodesForNameservice(ns); try { @@ -1034,7 +1035,7 @@ public RemoteResult invokeSequential( "Unexpected exception proxying API " + e.getMessage(), e); thrownExceptions.add(ioe); } finally { - releasePermit(ns, ugi, remoteMethod, controller); + releasePermit(ns, ugi, remoteMethod, controller, permit); } } @@ -1360,7 +1361,7 @@ public Map invokeConcurrent( T location = locations.iterator().next(); String ns = location.getNameserviceId(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(ns, ugi, method, controller); + Permit permit = acquirePermit(ns, ugi, method, controller); final List namenodes = getNamenodesForNameservice(ns); try { @@ -1373,7 +1374,7 @@ public Map invokeConcurrent( // Localize the exception throw processException(ioe, location); } finally { - releasePermit(ns, ugi, method, controller); + releasePermit(ns, ugi, method, controller, permit); } } @@ -1424,7 +1425,7 @@ public Map invokeConcurrent( } RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); - acquirePermit(CONCURRENT_NS, ugi, method, controller); + Permit permit = acquirePermit(CONCURRENT_NS, ugi, method, controller); try { List> futures = null; if (timeOutMs > 0) { @@ -1482,7 +1483,7 @@ public Map invokeConcurrent( throw new IOException( "Unexpected error while invoking API " + ex.getMessage(), ex); } finally { - releasePermit(CONCURRENT_NS, ugi, method, controller); + releasePermit(CONCURRENT_NS, ugi, method, controller, permit); } } @@ -1566,11 +1567,13 @@ private String getNameserviceForBlockPoolId(final String bpId) * @param controller fairness policy controller to acquire permit from * @throws IOException If permit could not be acquired for the nsId. */ - private void acquirePermit(final String nsId, final UserGroupInformation ugi, + private Permit acquirePermit(final String nsId, final UserGroupInformation ugi, final RemoteMethod m, RouterRpcFairnessPolicyController controller) throws IOException { + Permit permit = Permit.getDontNeedPermit(); if (controller != null) { - if (!controller.acquirePermit(nsId)) { + permit = controller.acquirePermit(nsId); + if (permit.isNoPermit()) { // Throw StandByException, // Clients could fail over and try another router. if (rpcMonitor != null) { @@ -1586,6 +1589,7 @@ private void acquirePermit(final String nsId, final UserGroupInformation ugi, } incrAcceptedPermitForNs(nsId); } + return permit; } /** @@ -1597,9 +1601,10 @@ private void acquirePermit(final String nsId, final UserGroupInformation ugi, * @param controller fairness policy controller to release permit from */ private void releasePermit(final String nsId, final UserGroupInformation ugi, - final RemoteMethod m, RouterRpcFairnessPolicyController controller) { + final RemoteMethod m, RouterRpcFairnessPolicyController controller, + Permit permit) { if (controller != null) { - controller.releasePermit(nsId); + controller.releasePermit(nsId, permit); LOG.trace("Permit released for ugi: {} for method: {}", ugi, m.getMethodName()); } @@ -1637,7 +1642,11 @@ public Long getAcceptedPermitForNs(String ns) { public synchronized String refreshFairnessPolicyController(Configuration conf) { RouterRpcFairnessPolicyController newController; try { - newController = FederationUtil.newFairnessPolicyController(conf); + int version = 0; + if (routerRpcFairnessPolicyController != null) { + version = routerRpcFairnessPolicyController.getVersion(); + } + newController = FederationUtil.newFairnessPolicyController(conf, version); } catch (RuntimeException e) { LOG.error("Failed to create router fairness policy controller", e); return getCurrentFairnessPolicyControllerClassName(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index fcf6a28475fbd..524c0c8f3bc52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -723,6 +723,27 @@ + + dfs.federation.router.fairness.elastic.permits.percent.EXAMPLENAMESERVICE + + + The maximum percentage of total elastic handler counts that the + nameservice EXAMPLENAMESERVICE can use. When the dedicated handlers + are used up, this nameservice EXAMPLENAMESERVICE can preempt the total + elastic handlers. And this conf is used to limit the maximum of total + elastic handlers than this ns can preempt. + + + + + dfs.federation.router.fairness.elastic.permits.percent.default + 20 + + The default maximum percentage of total elastic handler counts + that no specially configured nameservice can use. + + + dfs.federation.router.federation.rename.bandwidth 10 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestElasticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestElasticRouterRpcFairnessPolicyController.java new file mode 100644 index 0000000000000..7accef2eed267 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestElasticRouterRpcFairnessPolicyController.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.hdfs.server.federation.fairness.Permit.PermitType; + +/** + * Test functionality of {@link ElasticRouterRpcFairnessPolicyController). + */ +public class TestElasticRouterRpcFairnessPolicyController { + private final static String NAMESERVICES = + "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + + private void verifyAcquirePermit(RouterRpcFairnessPolicyController controller, + String nsId, int dedicatedPermit, int elasticPermit) { + for (int i = 0; i < dedicatedPermit; i++) { + assertEquals(controller.acquirePermit(nsId) + .getPermitType(), PermitType.DEDICATED); + } + for (int i = 0; i < elasticPermit; i++) { + assertEquals(controller.acquirePermit(nsId) + .getPermitType(), PermitType.SHARED); + } + assertEquals(controller.acquirePermit(nsId) + .getPermitType(), PermitType.NO_PERMIT); + } + + @Test + public void testNoElasticPermits() { + Configuration conf = createConf(10); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 5); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 2); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 3); + + ElasticRouterRpcFairnessPolicyController controller = + new ElasticRouterRpcFairnessPolicyController(conf, 0); + assertNotNull(controller); + assertEquals(0, controller.getTotalElasticAvailableHandler()); + + verifyAcquirePermit(controller, "ns1", 5, 0); + verifyAcquirePermit(controller, "ns2", 2, 0); + verifyAcquirePermit(controller, CONCURRENT_NS, 3, 0); + } + + @Test + public void testConfNoUseElasticPermits() { + Configuration conf = createConf(20); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 5); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 2); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 3); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_DEFAULT_KEY, 0); + + ElasticRouterRpcFairnessPolicyController controller = + new ElasticRouterRpcFairnessPolicyController(conf, 0); + assertNotNull(controller); + assertEquals(10, controller.getTotalElasticAvailableHandler()); + + verifyAcquirePermit(controller, "ns1", 5, 0); + verifyAcquirePermit(controller, "ns2", 2, 0); + verifyAcquirePermit(controller, CONCURRENT_NS, 3, 0); + } + + @Test + public void testConfSmallerElasticPermits() { + Configuration conf = createConf(20); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 5); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 2); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 3); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns1", 20); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns2", 20); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + CONCURRENT_NS, 20); + + ElasticRouterRpcFairnessPolicyController controller = + new ElasticRouterRpcFairnessPolicyController(conf, 0); + assertNotNull(controller); + assertEquals(10, controller.getTotalElasticAvailableHandler()); + + verifyAcquirePermit(controller, "ns1", 5, 2); + verifyAcquirePermit(controller, "ns2", 2, 2); + verifyAcquirePermit(controller, CONCURRENT_NS, 3, 2); + assertEquals(4, controller.getTotalElasticAvailableHandler()); + } + + @Test + public void testConfMoreElasticPermits() { + Configuration conf = createConf(20); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 5); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 2); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 3); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns1", 40); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns2", 40); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + CONCURRENT_NS, 40); + + ElasticRouterRpcFairnessPolicyController controller = + new ElasticRouterRpcFairnessPolicyController(conf, 0); + assertNotNull(controller); + assertEquals(10, controller.getTotalElasticAvailableHandler()); + + verifyAcquirePermit(controller, "ns1", 5, 4); + verifyAcquirePermit(controller, "ns2", 2, 4); + verifyAcquirePermit(controller, CONCURRENT_NS, 3, 2); + assertEquals(0, controller.getTotalElasticAvailableHandler()); + } + + @Test + public void testConfMoreElasticPermits2() { + Configuration conf = createConf(20); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 5); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 2); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 3); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns1", 100); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + "ns2", 100); + conf.setInt(DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + CONCURRENT_NS, 100); + + ElasticRouterRpcFairnessPolicyController controller = + new ElasticRouterRpcFairnessPolicyController(conf, 0); + assertNotNull(controller); + assertEquals(10, controller.getTotalElasticAvailableHandler()); + + verifyAcquirePermit(controller, "ns1", 5, 10); + verifyAcquirePermit(controller, "ns2", 2, 0); + verifyAcquirePermit(controller, CONCURRENT_NS, 3, 0); + assertEquals(0, controller.getTotalElasticAvailableHandler()); + } + + private Configuration createConf(int handlers) { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers); + conf.set(DFS_ROUTER_MONITOR_NAMENODE, NAMESERVICES); + conf.setClass( + RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + ElasticRouterRpcFairnessPolicyController.class, + RouterRpcFairnessPolicyController.class); + return conf; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java index 2d27c66e37ee0..481feae2952c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java @@ -137,13 +137,13 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness) // take the lock for concurrent NS to block fanout calls assertTrue(routerContext.getRouter().getRpcServer() .getRPCClient().getRouterRpcFairnessPolicyController() - .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS)); + .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS).isHoldPermit()); } else { for (String ns : cluster.getNameservices()) { LOG.info("Taking lock first for ns: {}", ns); assertTrue(routerContext.getRouter().getRpcServer() .getRPCClient().getRouterRpcFairnessPolicyController() - .acquirePermit(ns)); + .acquirePermit(ns).isHoldPermit()); } } } @@ -156,6 +156,10 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness) assertEquals(latestRejectedPermits - originalRejectedPermits, overloadException.get()); + Permit permit = new Permit(Permit.PermitType.DEDICATED, + routerContext.getRouter().getRpcServer().getRPCClient() + .getRouterRpcFairnessPolicyController().getVersion()); + if (fairness) { assertTrue(overloadException.get() > 0); if (isConcurrent) { @@ -163,12 +167,12 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness) // take the lock for concurrent NS to block fanout calls routerContext.getRouter().getRpcServer() .getRPCClient().getRouterRpcFairnessPolicyController() - .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS); + .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS, permit); } else { for (String ns : cluster.getNameservices()) { routerContext.getRouter().getRpcServer() .getRPCClient().getRouterRpcFairnessPolicyController() - .releasePermit(ns); + .releasePermit(ns, permit); } } } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java index 8307f666b5d1c..0aab3b9c8604a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java @@ -54,7 +54,8 @@ public void testHandlerAllocationWithLeftOverHandler() { RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = getFairnessPolicyController(31); // One extra handler should be allocated to commons. - assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); verifyHandlerAllocation(routerRpcFairnessPolicyController); } @@ -63,24 +64,30 @@ public void testHandlerAllocationPreconfigured() { Configuration conf = createConf(40); conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30); RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + FederationUtil.newFairnessPolicyController(conf, 0); // ns1 should have 30 permits allocated for (int i=0; i<30; i++) { - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); } // ns2 should have 5 permits. // concurrent should have 5 permits. for (int i=0; i<5; i++) { - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); assertTrue( - routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } @Test @@ -117,7 +124,7 @@ public void testGetAvailableHandlerOnPerNs() { public void testGetAvailableHandlerOnPerNsForNoFairness() { Configuration conf = new Configuration(); RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + FederationUtil.newFairnessPolicyController(conf, 0); assertEquals("N/A", routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs()); } @@ -136,20 +143,26 @@ public void testHandlerAllocationConcurrentConfigured() { conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns2", 1); conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1); RouterRpcFairnessPolicyController routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + FederationUtil.newFairnessPolicyController(conf, 0); // ns1, ns2 should have 1 permit each - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); // concurrent should have 3 permits for (int i=0; i<3; i++) { assertTrue( - routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } - assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } @@ -157,14 +170,14 @@ private void verifyInstantiationError(Configuration conf, int handlerCount, int totalDedicatedHandlers) { GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer .captureLogs(LoggerFactory.getLogger( - StaticRouterRpcFairnessPolicyController.class)); + AbstractRouterRpcFairnessPolicyController.class)); try { - FederationUtil.newFairnessPolicyController(conf); + FederationUtil.newFairnessPolicyController(conf, 0); } catch (IllegalArgumentException e) { // Ignore the exception as it is expected here. } String errorMsg = String.format( - StaticRouterRpcFairnessPolicyController.ERROR_MSG, handlerCount, + AbstractRouterRpcFairnessPolicyController.ERROR_MSG, handlerCount, totalDedicatedHandlers); assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg)); @@ -172,28 +185,40 @@ private void verifyInstantiationError(Configuration conf, int handlerCount, private RouterRpcFairnessPolicyController getFairnessPolicyController( int handlers) { - return FederationUtil.newFairnessPolicyController(createConf(handlers)); + return FederationUtil.newFairnessPolicyController(createConf(handlers), 0); } private void verifyHandlerAllocation( RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) { for (int i=0; i<10; i++) { - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); assertTrue( - routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2")); - assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); - - routerRpcFairnessPolicyController.releasePermit("ns1"); - routerRpcFairnessPolicyController.releasePermit("ns2"); - routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS); - - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1")); - assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2")); - assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS)); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); + assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); + + Permit permit = new Permit(Permit.PermitType.DEDICATED, + routerRpcFairnessPolicyController.getVersion()); + + routerRpcFairnessPolicyController.releasePermit("ns1", permit); + routerRpcFairnessPolicyController.releasePermit("ns2", permit); + routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS, permit); + + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1") + .isHoldPermit()); + assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2") + .isHoldPermit()); + assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS) + .isHoldPermit()); } private Configuration createConf(int handlers) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java index 50527b6573f9f..5c4e3b8039e4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java @@ -46,6 +46,9 @@ public void initializeMemberVariables() { // Allocate xmlPropsToSkipCompare = new HashSet(); + xmlPropsToSkipCompare.add( + RBFConfigKeys.DFS_ROUTER_ELASTIC_PERMITS_PERCENT_KEY_PREFIX + + "EXAMPLENAMESERVICE"); xmlPrefixToSkipCompare = new HashSet(); xmlPrefixToSkipCompare.add( RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX);