Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

/**
* A class to manage the issuance and recycling of Permits.
*/
public interface AbstractPermitManager {
/**
* Request one Permit instance for the NS.
*/
Permit acquirePermit();

/**
* Release the permit instance for the NS.
*/
void releasePermit(Permit permit);

void drainPermits();

int availablePermits();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.Set;

import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;

/**
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
* Internally a map of nameservice to Semaphore is used to control permits.
Expand All @@ -40,49 +42,70 @@ public class AbstractRouterRpcFairnessPolicyController
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);

/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;
private Map<String, AbstractPermitManager> permits;

/** Version to support dynamically change permits. **/
private final int version;

public static final String ERROR_MSG = "Configured handlers "
+ DFS_ROUTER_HANDLER_COUNT_KEY + '='
+ " %d is less than the minimum required handlers %d";

AbstractRouterRpcFairnessPolicyController(int version) {
permits = new HashMap<>();
this.version = version;
}

public void init(Configuration conf) {
this.permits = new HashMap<>();
/**
* Init the permits.
*/
public void initPermits(Map<String, AbstractPermitManager> newPermits) {
this.permits = newPermits;
}

@Override
public boolean acquirePermit(String nsId) {
try {
LOG.debug("Taking lock for nameservice {}", nsId);
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.debug("Cannot get a permit for nameservice {}", nsId);
public int getVersion() {
return this.version;
}

@Override
public Permit acquirePermit(String nsId) {
LOG.debug("Taking lock for nameservice {}", nsId);
AbstractPermitManager permitManager = permits.get(nsId);
if (permitManager != null) {
return permitManager.acquirePermit();
} else {
// TODO Add one metric to monitor this abnormal case.
LOG.warn("Can't find NSPermit for {}.", nsId, new Throwable());
return Permit.getNoPermit();
}
return false;
}

@Override
public void releasePermit(String nsId) {
this.permits.get(nsId).release();
public void releasePermit(String nsId, Permit permitInstance) {
if (permitInstance == null || permitInstance.isDontNeedPermit()) {
return;
}
AbstractPermitManager permitManager =
this.permits.get(nsId);
if (permitManager != null) {
permitManager.releasePermit(permitInstance);
}
}

@Override
public void shutdown() {
LOG.debug("Shutting down router fairness policy controller");
// drain all semaphores
for (Semaphore sema: this.permits.values()) {
sema.drainPermits();
for (AbstractPermitManager permitManager: this.permits.values()) {
permitManager.drainPermits();
}
}

protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
this.permits.put(nsId, new Semaphore(maxPermits));
}

protected int getAvailablePermits(String nsId) {
return this.permits.get(nsId).availablePermits();
}

@Override
public String getAvailableHandlerOnPerNs() {
JSONObject json = new JSONObject();
for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
for (Map.Entry<String, AbstractPermitManager> entry : permits.entrySet()) {
try {
String nsId = entry.getKey();
int availableHandler = entry.getValue().availablePermits();
Expand All @@ -93,4 +116,30 @@ public String getAvailableHandlerOnPerNs() {
}
return json.toString();
}

protected int getDedicatedHandlers(Configuration conf, String nsId) {
return conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 1);
}

/**
* Validate all configured dedicated handlers for the nameservices.
* @return sum of dedicated handlers of all nameservices
* @throws IllegalArgumentException
* if total dedicated handlers more than handler count.
*/
protected int validateHandlersCount(Configuration conf,
int handlerCount, Set<String> allConfiguredNS) {
int totalDedicatedHandlers = 0;
for (String nsId : allConfiguredNS) {
int dedicatedHandlers = getDedicatedHandlers(conf, nsId);
totalDedicatedHandlers += dedicatedHandlers;
}
if (totalDedicatedHandlers > handlerCount) {
String msg = String.format(ERROR_MSG, handlerCount,
totalDedicatedHandlers);
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
return totalDedicatedHandlers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.server.federation.fairness.Permit.PermitType;

/**
* A class that manages permits in an exclusive and shared mode.
*/
public class ElasticPermitManager implements AbstractPermitManager {
private static final Logger LOG = LoggerFactory.getLogger(ElasticPermitManager.class);
private final String nsId;
/** Dedicated Permit Numbers. **/
private final int dedicatedCap;
private final Semaphore dedicatedPermits;
/** The maximum number of elastic permits that can be acquired. **/
private final int maximumELNumberCanUse;
private final Semaphore maximumELPermitsCanUse;
/** Total elastic permits. **/
private final Semaphore totalElasticPermits;
/** A version to ensure that permits can be correctly recycled,
* after dynamically refreshing FairnessPolicyController. **/
private final int version;
/** A cached permits Map. **/
private final Map<PermitType, Permit> cachedPermits;

ElasticPermitManager(String nsId, int dedicatedCap,
int maximumELNumberCanUse, Semaphore totalElasticPermits, int version) {
this.nsId = nsId;
this.dedicatedCap = dedicatedCap;
this.dedicatedPermits = new Semaphore(dedicatedCap);
this.maximumELNumberCanUse = maximumELNumberCanUse;
this.maximumELPermitsCanUse = new Semaphore(maximumELNumberCanUse);
this.totalElasticPermits = totalElasticPermits;
this.version = version;
this.cachedPermits = Permit.getPermitBaseOnVersion(version);
LOG.info("New NSPermitManager " + this);
}

@Override
public String toString() {
return "NSPermitManager[nsId=" + nsId
+ ", dedicatedPermitsNumber=" + dedicatedCap
+ ", sharedPermitsNumber=" + maximumELNumberCanUse
+ ", totalSharedPermits=" + totalElasticPermits
+ ", version=" + version + "]";
}

@Override
public Permit acquirePermit() {
Permit permit = this.cachedPermits.get(PermitType.NO_PERMIT);
try {
if (acquireDedicatedPermit()) {
permit = this.cachedPermits.get(PermitType.DEDICATED);
} else if (acquireElasticPermit()) {
permit = this.cachedPermits.get(PermitType.SHARED);
}
} catch (InterruptedException e) {
// ignore
}
return permit;
}

/**
* Try to acquire one permit from Dedicated Semaphore.
* Tips: without timeout to ensure lower latency
*/
private boolean acquireDedicatedPermit() throws InterruptedException {
return this.dedicatedPermits.tryAcquire();
}

/**
* Try to acquire one permit from Total Elastic Permits.
*/
private boolean acquireElasticPermit() throws InterruptedException {
boolean result = false;
if (maximumELNumberCanUse > 0 && totalElasticPermits != null) {
if (this.maximumELPermitsCanUse.tryAcquire(1, TimeUnit.SECONDS)) {
if (this.totalElasticPermits.tryAcquire(1, TimeUnit.SECONDS)) {
result = true;
} else {
this.maximumELPermitsCanUse.release();
}
}
}
return result;
}

@Override
public void releasePermit(Permit permit) {
if (permit.getPermitVersion() == version) {
switch (permit.getPermitType()) {
case DEDICATED:
this.dedicatedPermits.release();
break;
case SHARED:
this.maximumELPermitsCanUse.release();
this.totalElasticPermits.release();
break;
case DONT_NEED_PERMIT:
break;
default:
LOG.warn("Unexpected permitType {}", permit.getPermitType());
break;
}
} else {
LOG.warn("Wrong permit version fro {}, expect {}, actual {}.",
nsId, version, permit.getPermitVersion());
}
}

@Override
public void drainPermits() {
this.dedicatedPermits.drainPermits();
}

@Override
public int availablePermits() {
return this.dedicatedPermits.availablePermits()
+ this.maximumELPermitsCanUse.availablePermits();
}
}
Loading