Skip to content

Commit

Permalink
HADOOP-12916. Allow RPC scheduler/callqueue backoff using response ti…
Browse files Browse the repository at this point in the history
…mes. Contributed by Xiaoyu Yao.
  • Loading branch information
xiaoyuyao committed Mar 31, 2016
1 parent 0a74610 commit d95c6eb
Show file tree
Hide file tree
Showing 17 changed files with 893 additions and 267 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1626,6 +1626,10 @@ public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
return defaultValue; return defaultValue;
} }
vStr = vStr.trim(); vStr = vStr.trim();
return getTimeDurationHelper(name, vStr, unit);
}

private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr); ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
if (null == vUnit) { if (null == vUnit) {
LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit); LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
Expand All @@ -1636,6 +1640,15 @@ public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
return unit.convert(Long.parseLong(vStr), vUnit.unit()); return unit.convert(Long.parseLong(vStr), vUnit.unit());
} }


public long[] getTimeDurations(String name, TimeUnit unit) {
String[] strings = getTrimmedStrings(name);
long[] durations = new long[strings.length];
for (int i = 0; i < strings.length; i++) {
durations[i] = getTimeDurationHelper(name, strings[i], unit);
}
return durations;
}

/** /**
* Get the value of the <code>name</code> property as a <code>Pattern</code>. * Get the value of the <code>name</code> property as a <code>Pattern</code>.
* If no such property is specified, or if the specified value is not a valid * If no such property is specified, or if the specified value is not a valid
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -90,14 +90,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** /**
* CallQueue related settings. These are not used directly, but rather * CallQueue related settings. These are not used directly, but rather
* combined with a namespace and port. For instance: * combined with a namespace and port. For instance:
* IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY * IPC_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
*/ */
public static final String IPC_CALLQUEUE_NAMESPACE = "ipc"; public static final String IPC_NAMESPACE = "ipc";
public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl"; public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl"; public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
public static final String IPC_BACKOFF_ENABLE = "backoff.enable"; public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false; public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;


/**
* IPC scheduler priority levels.
*/
public static final String IPC_SCHEDULER_PRIORITY_LEVELS_KEY =
"scheduler.priority.levels";
public static final int IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY = 4;

/** This is for specifying the implementation for the mappings from /** This is for specifying the implementation for the mappings from
* hostnames to the racks they belong to * hostnames to the racks they belong to
*/ */
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;


/** /**
* Abstracts queue operations for different blocking queues. * Abstracts queue operations for different blocking queues.
Expand All @@ -43,32 +44,90 @@ static <E> Class<? extends BlockingQueue<E>> convertQueueClass(
Class<?> queueClass, Class<E> elementClass) { Class<?> queueClass, Class<E> elementClass) {
return (Class<? extends BlockingQueue<E>>)queueClass; return (Class<? extends BlockingQueue<E>>)queueClass;
} }

@SuppressWarnings("unchecked")
static Class<? extends RpcScheduler> convertSchedulerClass(
Class<?> schedulerClass) {
return (Class<? extends RpcScheduler>)schedulerClass;
}

private final boolean clientBackOffEnabled; private final boolean clientBackOffEnabled;


// Atomic refs point to active callQueue // Atomic refs point to active callQueue
// We have two so we can better control swapping // We have two so we can better control swapping
private final AtomicReference<BlockingQueue<E>> putRef; private final AtomicReference<BlockingQueue<E>> putRef;
private final AtomicReference<BlockingQueue<E>> takeRef; private final AtomicReference<BlockingQueue<E>> takeRef;


private RpcScheduler scheduler;

public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass, public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
Class<? extends RpcScheduler> schedulerClass,
boolean clientBackOffEnabled, int maxQueueSize, String namespace, boolean clientBackOffEnabled, int maxQueueSize, String namespace,
Configuration conf) { Configuration conf) {
int priorityLevels = parseNumLevels(namespace, conf);
this.scheduler = createScheduler(schedulerClass, priorityLevels,
namespace, conf);
BlockingQueue<E> bq = createCallQueueInstance(backingClass, BlockingQueue<E> bq = createCallQueueInstance(backingClass,
maxQueueSize, namespace, conf); priorityLevels, maxQueueSize, namespace, conf);
this.clientBackOffEnabled = clientBackOffEnabled; this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq); this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq); this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue " + backingClass); LOG.info("Using callQueue: " + backingClass + " scheduler: " +
schedulerClass);
}

private static <T extends RpcScheduler> T createScheduler(
Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
// Used for custom, configurable scheduler
try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
String.class, Configuration.class);
return ctor.newInstance(priorityLevels, ns, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}

try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
return ctor.newInstance(priorityLevels);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}

// Last attempt
try {
Constructor<T> ctor = theClass.getDeclaredConstructor();
return ctor.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(theClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}

// Nothing worked
throw new RuntimeException(theClass.getName() +
" could not be constructed.");
} }


private <T extends BlockingQueue<E>> T createCallQueueInstance( private <T extends BlockingQueue<E>> T createCallQueueInstance(
Class<T> theClass, int maxLen, String ns, Configuration conf) { Class<T> theClass, int priorityLevels, int maxLen, String ns,
Configuration conf) {


// Used for custom, configurable callqueues // Used for custom, configurable callqueues
try { try {
Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class, Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
Configuration.class); int.class, String.class, Configuration.class);
return ctor.newInstance(maxLen, ns, conf); return ctor.newInstance(priorityLevels, maxLen, ns, conf);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (InvocationTargetException e) { } catch (InvocationTargetException e) {
Expand Down Expand Up @@ -110,6 +169,22 @@ boolean isClientBackoffEnabled() {
return clientBackOffEnabled; return clientBackOffEnabled;
} }


// Based on policy to determine back off current call
boolean shouldBackOff(Schedulable e) {
return scheduler.shouldBackOff(e);
}

void addResponseTime(String name, int priorityLevel, int queueTime,
int processingTime) {
scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
}

// This should be only called once per call and cached in the call object
// each getPriorityLevel call will increment the counter for the caller
int getPriorityLevel(Schedulable e) {
return scheduler.getPriorityLevel(e);
}

/** /**
* Insert e into the backing queue or block until we can. * Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the * If we block and the queue changes on us, we will insert while the
Expand Down Expand Up @@ -146,15 +221,46 @@ public int size() {
return takeRef.get().size(); return takeRef.get().size();
} }


/**
* Read the number of levels from the configuration.
* This will affect the FairCallQueue's overall capacity.
* @throws IllegalArgumentException on invalid queue count
*/
@SuppressWarnings("deprecation")
private static int parseNumLevels(String ns, Configuration conf) {
// Fair call queue levels (IPC_CALLQUEUE_PRIORITY_LEVELS_KEY)
// takes priority over the scheduler level key
// (IPC_SCHEDULER_PRIORITY_LEVELS_KEY)
int retval = conf.getInt(ns + "." +
FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 0);
if (retval == 0) { // No FCQ priority level configured
retval = conf.getInt(ns + "." +
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY);
} else {
LOG.warn(ns + "." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY +
" is deprecated. Please use " + ns + "." +
CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY + ".");
}
if(retval < 1) {
throw new IllegalArgumentException("numLevels must be at least 1");
}
return retval;
}

/** /**
* Replaces active queue with the newly requested one and transfers * Replaces active queue with the newly requested one and transfers
* all calls to the newQ before returning. * all calls to the newQ before returning.
*/ */
public synchronized void swapQueue( public synchronized void swapQueue(
Class<? extends RpcScheduler> schedulerClass,
Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize, Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
String ns, Configuration conf) { String ns, Configuration conf) {
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize, int priorityLevels = parseNumLevels(ns, conf);
ns, conf); RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
ns, conf);
BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
priorityLevels, maxSize, ns, conf);


// Our current queue becomes the old queue // Our current queue becomes the old queue
BlockingQueue<E> oldQ = putRef.get(); BlockingQueue<E> oldQ = putRef.get();
Expand All @@ -168,6 +274,8 @@ public synchronized void swapQueue(
// Swap takeRef to handle new calls // Swap takeRef to handle new calls
takeRef.set(newQ); takeRef.set(newQ);


this.scheduler = newScheduler;

LOG.info("Old Queue: " + stringRepr(oldQ) + ", " + LOG.info("Old Queue: " + stringRepr(oldQ) + ", " +
"Replacement: " + stringRepr(newQ)); "Replacement: " + stringRepr(newQ));
} }
Expand Down
Loading

0 comments on commit d95c6eb

Please sign in to comment.